CompletableFuture
Java์์์ Concurrent
๋ฉํฐ ํ๋ก์ธ์ฑ ( ProcessBuilder )
๋ฉํฐ ์ค๋ ๋ฉ ( Thread/Runnable )
๋์์ฑ(Concurrency) vs ๋ณ๋ ฌ์ฑ(Parallelism)
๋์์ฑ? ์ฑ๊ธ์ฝ์ด์์ ๋ฉํฐ ์ค๋ ๋๋ฅผ ๋์์ํค๊ธฐ ์ํ ๋ฐฉ์์ผ๋ก ๋ฉํฐ ํ์คํน์ ์ํด ์ฌ๋ฌ๊ฐ ์ค๋ ๋๊ฐ ๋ฒ๊ฐ์๊ฐ๋ฉด์ ์คํ. ํ๋ง๋๋ก ๋์์ ์คํ๋๋ ๊ฒ์ฒ๋ผ ๋ณด์ด๋ ๊ฒ.
๋ฉํฐ์ค๋ ๋๋ก ๋์์ฑ์ ๋ง์กฑ์ํฌ์ ์๋ ๊ฑฐ์ง ๋์์ฑ์ด ๋ฉํฐ์ค๋ ๋๋ ์๋. ์ฝํ๋ฆฐ์ ์ฝ๋ฃจํด์ ์ฑ๊ธ์ค๋ ๋๋ก ๋์์ฑ์ ๋ง์กฑ
๋ณ๋ ฌ์ฑ? 2๊ฐ์ด์์ task๊ฐ ์์๋ ๊ฐ task๊ฐ ๋ฌผ๋ฆฌ์ ์ธ ์๊ฐ์ผ๋ก ๋์์ ์คํ์ด ๊ฐ๋ฅ. ๋ฉํฐ์ฝ์ด์์ ๋ฉํฐ์ค๋ ๋๋ฅผ ๋์์ํค๋ ๋ฐฉ์์ผ๋ก ํ๊ฐ ์ด์์ ์ค๋ ๋๋ฅผ ํฌํจํ๋ ๊ฐ ์์ ๋ค์ด ๋ฌผ๋ฆฌ์ ์ธ ์๊ฐ์ผ๋ก ์์ ๋์์ ์ํํ๋ ๊ฒ. (์ด๋ ๋์ ์์ ์ ๋ฉํฐ์ฝ์ด๊ฐ ๋ ์๋ ๋คํธ์ํฌ๋ฅผ ์ด์ฉํ ๋ถ์ฐ์ปดํจํ ์ด ๋ ์ ์๋ค.)
๋ณ๋ ฌ์ฑ์ ๋ง์กฑํ๋ฉด ๋์์ฑ๋ ๋ง์กฑ, ๋์์ฑ์ ๋ง์กฑํ๋ค๊ณ ๋ณ๋ ฌ์ฑ ๋ง์กฑx
๋ฉํฐ ์ค๋ ๋
๋ฉํฐ ์ค๋ ๋ ์์ฑ
1. Thread ๋ฅผ ์์๋ฐ์ ํด๋์ค๋ก ์ ์
2. Thread ์์ฑ์์ ํ๋ฆฌ๋ฏธํฐ๋ก Runnalbe ์ธํฐํ์ด์ค ๊ตฌํํ์ฌ ์์ฑ
์ค๋ ๋ ์ฃผ์ ๊ธฐ๋ฅ
ํ์ฌ ์ค๋ ๋ ๋ฉ์ถฐ๋๊ธฐ (sleep) : ๋ค๋ฅธ ์ค๋ ๋๊ฐ ์ฒ๋ฆฌํ๋๋ก ๊ธฐํ๋ฅผ ์ฃผ์ง๋ง ๋ฝ์ ๊ฑธ์ง ์์ ๋ฐ๋๋ฝ์ด ๊ฑธ๋ฆด ์ ์๋ค.
๋ค๋ฅธ ์ค๋ ๋ ๊นจ์ฐ๊ธฐ (interrupted) : ๋ค๋ฅธ ์ค๋ ๋๋ฅผ ๊นจ์์ InterruptedException์ ๋ฐ์ ์ํจ๋ค. ์์ธ๊ฐ ๋ฐ์ํ์ ๋ ํ ์ผ์ ์ฝ๋ฉํ๊ธฐ ๋๋ฆ์ด๋ค.
๋ง์ผ catch๊ตฌ๋ฌธ์ return ๋ฌธ์ด ์กด์ฌํ์ง ์๋๋ค๋ฉด ๊ทธ๋๋ก catch๋ธ๋ญ์ ๋น ์ ธ๋๊ฐ
thread hello
๋ฅผ ์ถ๋ ฅํ ๊ฒ์ด๋ค.๋ค๋ฅธ ์ค๋ ๋ ๊ธฐ๋ค๋ฆฌ๊ธฐ (join) : ๋ค๋ฅธ ์ค๋ ๋๊ฐ ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
์ค๋ ๋์ ํ์ฌ ๋ฌธ์
์ค๋ ๋๊ฐ ๋ง์ ์ง๋ฉด ๋ง์ ์ง ์๋ก ์ฝ๋ฉ์ผ๋ก ๊ด๋ฆฌํ๊ธฐ๊ฐ ๋งค์ฐ ํ๋ค๋ค. ( Interrupt, join์ ๋ํ ์์ธ์ฒ๋ฆฌํ ์ฝ๋๊ฐ ๊ธฐํ๊ธ์์ ์ผ๋ก ๋ง์์ง๋ค.)
Executors
๊ณ ์์ค์ Concurrency ํ๋ก๊ทธ๋๋ฐ์ ์ง์ํ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก ์์ ๊ฐ์ ์ค๋ ๋ ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ค์ ์ด์ฉํ์ฌ ๊ตฌํ๋์ด์๋ค.
์ค๋ ๋๋ฅผ ๋ง๋ค๊ณ ๊ด๋ฆฌํ๋ ์์ ์ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ถ๋ฆฌํ์ฌ Executors์ ์์ํ ํํ์ด๋ค.
ํ๋ ์ผ
์ค๋ ๋ ๋ง๋ค๊ธฐ : ์ค๋ ๋ ํ์ ๋ง๋ค์ด ๊ด๋ฆฌ
ExecutorService executorService = new ThreadPoolExecutorr(core,max,idleTime,TimeUnit.SECONDS,new SynchronousQueue()) : ์ง์ ์ค๋ ๋๊ฐ์์ ์ ํด์๊ฐ, ์์ ํ๋ฑ์ ์ค์ ํ์ฌ ์์ฑ ํ ์ ์๋ค.
์ฝ์ด ์ค๋ ๋? ์ค๋ ๋๊ฐ ์์ฑ๋๊ณ ์ ํด์ํ์์๋ ์ ๊ฑฐ๋์ง ์๊ณ ์ ์ง๋๋ ์ต๋ ๊ฐ์
ExecutorService executorService = Executors.newSingleThreadExecutor() : ํ๊ฐ์ ์ค๋ ๋๋ฅผ ๊ฐ๋ ์ค๋ ๋ํ ์์ฑ
ExecutorService executorService = Executors.newCachedThreadPool() : ์ด๊ธฐ ์ค๋ ๋ ์๋ 0, ์ฝ์ด ์ค๋๋ ์๋ 0์ ๊ฐ์ง๊ณ ์ค๋ ๋ ๊ฐ์๋ณด๋ค ์์ ๊ฐ์๊ฐ ๋ง์ผ๋ฉด ์ ์ค๋ ๋๋ฅผ ๊ทธ๋ ์์ฑ์์ผ ์์ ์ํค๋ ์ค๋ ๋ํ๋ก ์ต๋ ์ค๋ ๋ ์๋ integer.MAX_VALUE. ์ค๋ ๋๊ฐ ์์ฑ๋๊ณ 60์ด๊ฐ ์์ ์ ์ํํ์ง ์์ผ๋ฉด ์ ๊ฑฐ๋๋ค.
ExecutorService executorService = Executors.newFixedThreadPool(int nThreads) : ์ด๊ธฐ ์ค๋ ๋ ์๋ 0, ์ฝ์ด ์ค๋ ๋ ์๋ nThreads, ์ต๋ ์ค๋ ๋ ์๋ nThreads๋ฅผ ๊ฐ๋ ์ค๋ ๋ํ ์์ฑ. ์ค๋ ๋๊ฐ ์์ ์ ์ฒ๋ฆฌํ์ง ์๊ณ ๋๊ณ ์๋๋ผ๋ ์ค๋ ๋ ๊ฐ์๊ฐ ์ค์ด๋ค์ง ์๋๋ค.
ExecutorService executorService = Executors.newWorkStealingPool(int pallelism) : ForkJoinPool์ work stealing ๊ธฐ๋ฒ์ ๋ฉํฐํ๋ก์ธ์ค ํ ์์ฑ
์ค๋ ๋ ๊ด๋ฆฌ : ์ค๋ ๋ ์๋ช ์ฃผ๊ธฐ๋ฅผ ๊ด๋ฆฌ
ExecutorService๋ ์์ ์ ์คํํ๊ณ ๋๋ฉด ๋ค๋ฅธ ์์ ์ด ๋ค์ด์ฌ๋๊น์ง ๊ณ์ ๋๊ธฐํ๊ณ ์๊ธฐ ๋๋ฌธ์ ๋ช ์์ ์ผ๋ก ์ข ๋ฃ๋ฅผ ์์ผ์ค์ผ ํ๋ค.
์ข ๋ฃ ํจ์์๋ shutdown(),shutdownNow()๊ฐ ์๋๋ฐ shutdown()์
gracefulํ ์ข ๋ฃ
๋ผํ๋ฉฐ ๋ฐ๋์ ์ค๋ ๋์ ์์ ์ด ๋ง์น๊ณ ๋์ ์ข ๋ฃ๋ฅผ ํ๋ฉฐ shutdownNow()๋ ์์ ํํฉ์ ์๊ด์์ด ์ข ๋ฃํ๋ ๋ฉ์๋์ด๋ค.shutdown()์ voidํ์ ์ด๊ณ shutdownNow()๋ ๋ฐํ๊ฐ์ผ๋ก ๋ฏธ์ฒ๋ฆฌ๋ ์์ (Runnable) ๋ชฉ๋ก์ด๋ค.
์์ ์ฒ๋ฆฌ ๋ฐ ์คํ : ์ค๋ ๋๋ก ์คํํ ์์ ์ ์ ๊ณตํ ์ ์๋ API ์ ๊ณต
์์ ์คํ : submit, execute
์ค์ผ์ค๋ง : scheduleAtFixedRate()์ ๊ฐ์ ๋ฉ์๋
Callable / Future
Callable์ Runnable๊ณผ ๊ฑฐ์ ๋๋ถ๋ถ์ด ์ผ์นํ์ง๋ง returnํ์ ์ด ์กด์ฌํ๋ค๋ ์ ์ ์ฐจ์ด์ ์ด ์์ด ์ด๋ฅผ ์ด์ฉํด ์์ ์ด ๋๋ฌ์๋์ ๊ฐ์ ๋ฐํ๋ฐ์ ์ ์๋ค.
API
1. ์์
์์ฒญ (execute/submit)
execute : ๋ฆฌํดํ์ ์ด void๋ก ์์ ์ฒ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํด๋ฐ์ง ๋ชปํ๊ณ ์์ ์ฒ๋ฆฌ ๋์ค์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ์ค๋ ๋๊ฐ ์ข ๋ฃ๋๊ณ ํด๋น ์ค๋ ๋๋ ์ค๋ ๋ํ์์ ์ ๊ฑฐ
+) ctl : ์ค๋ ๋ํ์ ์ํ , ์กฐ๊ธ์ด๋ผ๋ ๋ ๋น ๋ฅธ ๊ณ์ฐ์ ์ํด ๋นํธ๋ก ๊ฐ์ ์ ์ฅํ๊ณ ๋นํธ ์ฐ์ฐ์ ํตํด ๋์
workerCount : ํ์ฌ ์ค๋ ๋ ๊ฐ์
Integer.SiZE - 3 ๋งํผ์ ๊ฐฏ์๊ฐ ์ต๋ ๊ฐฏ์๋ก ์ฝ 5์ต๊ฐ
runState : ์ค๋ ๋ํ์ ์ํ
RUNNING : ์๋ก์ด TASK๋ฅผ ๋ฐ๊ณ ํ์ ์ง์ด ๋ฃ์ ์ผ์ ํ๋ ์ํ
SHUTDOWN : ์๋ก์ด TASK๋ฅผ ๋ฐ์ง๋ง๊ณ ํ์์๋ TASK๋ฅผ ์ฒ๋ฆฌ
RUNNING -> SHUTDOWN : shutdown() ํธ์ถ์
STOP : ์๋ก์ด TASK๋ฅผ ๋ฐ์ง ์๊ณ ํ์์๋ TASK๋ ์ฒ๋ฆฌํ์ง ์์ ์ํ๋ก ํ์ฌ ์งํ์ค์ธ TASK์ INTERRUPT๋ฅผ ๊ฑด๋ค.
(RUNNING or SHUTDOWM) -> STOP : shutdownNow() ํธ์ถ์
TIDYING : ๋ชจ๋ TASK๋ ์๋ฉธ ๋์๊ณ , WORKER COUNT๋ 0. ์ด ์ํ๋ก ์ ์ด๋๋ ์ค๋ ๋๋ terminated() ๋ฉ์๋๋ฅผ ์คํ์์ผ ์ข ๋ฃํ ์์ ์ธ ์ํ
STOP -> TIDYING : ์ค๋ ๋ ํ์ด ๋น์์๋
TERMINATED : ๋ชจ๋ ์ค๋ ๋๊ฐ terminated()๋ ์ํ
submit : Futureํ์ ์ ๋ฐํํ๊ณ ์์ ์ฒ๋ฆฌ ๋์ค์ ์์ธ๊ฐ ๋ฐ์ํด๋ ์ค๋ ๋๋ ์ข ๋ฃ๋์ง ์๊ณ ๋ค์ ์์ ์ ์ํด ์ฌ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ์ ์ค๋ ๋์ ์์ฑ ์ค๋ฒํค๋๋ฅผ ์ค์ผ ์ ์๋ค.
2. ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐ (get/ poll / take)
get
get()ํจ์๋ blocking call ๋ฐฉ์์ด๋ผ ์ค๋ ๋์ ์์ ์ด ์์ง ๋๋์ง ์์๋ค๋ฉด ๊ธฐ๋ค๋ฆฐ ํ์ ๋ฆฌํด๊ฐ์ ๋ฐ์์ค๊ณ ๋ค์ ์ค์ ์คํํ๊ฒ ๋๋ค.
poll : ์๋ฃ๋ ์์ ์ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด ์ฆ์ null์ ๋ฆฌํด
poll(timeout, Timeunit) : ์๋ฃ๋ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด Timeout๊น์ง ๋ธ๋กํน
take : ์ผ๋ฃ๋ ์์ ์ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด ์์๋๊น์ง ๋ธ๋กํน
3. ์์
์ํ ํ์ธ (isDone())
์์ ์ด ๋๋ฌ๋ค๋ฉด true ๋ฐํ
4. ์์
์ทจ์ (cancel())
์ทจ์ ํ์ผ๋ฉด true, ๋ชปํ์ผ๋ฉด false ๋ฐํ
ํ๋ผ๋ฏธํฐ๋ก true๋ฅผ ์ฃผ๋ฉด ํ์ฌ ์งํ์ค์ธ ์ค๋ ๋๋ฅผ interrupt ํ๊ณ , false๋ฅผ ์ฃผ๋ฉด ์์ ์ด ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ณ ์ทจ์ํ๋ค.
5. ์ฌ๋ฌ์์
๋์์ ์คํ (invokeAll())
๊ฒฐ๊ณผ๋ฅผ Future List๋ก ๋ฐ์์ค๊ธฐ ๋๋ฌธ์ ๋์์ ์คํํ ์์ ์ค ์ ์ผ ์ค๋ ๊ฑธ๋ฆฌ๋ ์์ ์ ์๊ฐ๋งํผ ๊ฑธ๋ฆฌ๊ฒ ๋๋ค.
65. ๊ฐ์ฅ ๋จผ์ ์ข
๋ฃ๋ ์์
๋ฐํ๋ฐ๊ธฐ (invokeAny())
๊ฐ์ฅ ๋จผ์ ๋๋ ์์ ์ ๊ฒฐ๊ณผ๋ฌผ์ ๋ฐํ๋ฐ๋ ๋ฉ์๋์ธ๋ฐ ์ด๋ ๋ฐํ์ blocking call์ด๊ธฐ ๋๋ฌธ์ Future์ด ์๋ ์ผ๋ฐ ๊ฐ์ฒดํ์ ์ ๋ฐํ๋ฐ๋๋ค.
Future์ ๋ฌธ์ ์
Future๋ฅผ ์ธ๋ถ์์ ์ทจ์์ํค๊ฑฐ๋, get()์ ํ์์์์ ์คํํ๋๋ฑ ์๋ฃ์์ ์ ์ํํ ์ ์๋ค.
get()์ด๋ผ๋ blocking call์ ์ฌ์ฉํ์ง ์๊ณ ์๋ ์์ ์ด ๋๋ฌ์๋ ์ฝ๋ฐฑ์ ์คํํ ์ ์๋ค.
์ฌ๋ฌ Future๋ฅผ ์กฐ๋ฆฝํ ์ ์๋ค.
์์ธ์ฒ๋ฆฌ์ฉ API๋ฅผ ์ ๊ณตํ์ง ์๋๋ค.
CompletableFuture
์๋ฐ์์ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ๊ฐ๋ฅ์ผํ๋ ์ธํฐํ์ด์ค
API
1. ๋น๋๊ธฐ๋ก ์์
์คํ
์ํ๋ ์ฐ๋ ๋ ํ์ ์ฌ์ฉํด์ ์คํํ ์ ์๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก๋ ForkJoinPool.commonPool()๋ก ThreadPoolExecutor,ExecutorService๋ฅผ ์ฌ์ฉํ ์๋ ์๋ค.
๊ธฐ์กด์ ThreadPoolExecutor๋ ์๋ก ๋ ๋ฆฝ์ ์ธ ์์ ์ ์ํด ์ค๊ณ๋์์ผ๋ฉฐ ์ ์ฌ์ ์ผ๋ก ์ฐจ๋จ๋๊ณ ๊ฑฐ์น ์์ ๋ ์ผ๋์ ๋๊ณ ์ค๊ณ๋์๋ค.
ForkJoinPool? ์ฌ๊ท์ ์ธ ๋ค์ค ์ค๋ ๋ ํ๋ก๊ทธ๋จ์์ ๋๊ธฐ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์ค๊ณ๋ ๊ฒ์ผ๋ก ๋ค๋ฅธ ์์ ์ ๋ํด ์ฌ๊ท์ ์ด๋ฉฐ ์ํธ ์์กด์ ์ผ๋ ์ฌ์ฉํ๋ ๊ฒ์ด ํจ๊ณผ์ ์ธ ์ค๋ ๋ ํ. ์ด๋ ๋ค๋ฅธ ์์ ์ ๊ธฐ๋ค๋ฆฌ๋๋ฐ ๋ ๋ง์ ์๊ฐ์ ์๋นํ๊ณ ์์์ ๋ญ๋นํ๊ฒ ๋๋ ๋ฌธ์ ๊ฐ ์๋ค.
์ฝ๋ฐฑํจ์๋ค์ ๋๋ฒ์งธ ๋งค๊ฐ๋ณ์๋ก ์ค๋ ๋ ํ์ ์ ๋ฌํ๋ฉด ํด๋น ์ค๋ ๋ํ์ ์ด์ฉํ์ฌ ์์ ์ ์ํํ ์ ์๋ค.
runAsync() : ๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ
supplyAsync() : ๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ
2. ์ฝ๋ฐฑ ์ ๊ณต
์ฝ๋ฐฑ ์์ฒด๋ฅผ ๋๋ค๋ฅธ ์ค๋ ๋์์ ์คํํ ์ ์๋ค.
thenApply(Function) : ๋ฆฌํด๊ฐ์ ๋ฐ์์ ๋ค๋ฅธ ๊ฐ์ผ๋ก ๋ฐ๊พธ๋ ์ฝ๋ฐฑ
thenAccept(Consumer) : ๋ฆฌํด๊ฐ์ผ๋ก ๋ฆฌํด์์ด ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ
thenRun(Runnable) : ๋ฆฌํด๊ฐ์ ๋ฐ์ง ์๊ณ ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ
3. ์กฐํฉํ๊ธฐ
thenCompose() : ๋ ์์ ์ด ์๋ก ์ฐ๊ด๊ด๊ณ๊ฐ ์์ด ์ด์ด์ ์คํํ๋๋ก ์กฐํฉ
thenApply์ ๊ฑฐ์ ๋น์ทํ๋ค๊ณ ํ ์ ์๋๋ฐ ์ฐจ์ด์ ์ด๋ผ๊ณ ํ๋ค๋ฉด thenApply๋ ๋ฐํ๊ฐ์ ์ด์ฉํด ์๋ก์ด ์ฝ๋ฐฑ์ ์คํํ๋ค๋ฉด, thenCompose๋ ๋ฐํ๊ฐ์ด ์๋๋ผ ์ด์ ์ ์ฒ๋ฆฌ ๋ก์ง ์์ฒด๋ฅผ ์ธ์๋ก ์๋ก์ด ์ฝ๋ฐฑ์ ์คํํ๋ ๊ฒ. ํ๋ง๋๋ก Future๋ฅผ ์ค์ฒฉํ๋ ๊ฒ์ด ์๋๋ผ ํ๋ฉดํํ์ฌ ๋ฐํํ๋ ๊ฒ์ด๋ค. ๊ทธ๋์ CompletableFuture ๋ฉ์๋๋ฅผ ์ฐ๊ฒฐํ๋ ๊ฒ์ด๋ผ๋ฉด thenComposer๊ฐ ์ข๋ค.
thenCombine() : ๋ ์์ ์ ๋ ๋ฆฝ์ ์ผ๋ก ์คํํ๊ณ ๋ค ์ข ๋ฃ ํ์๋ ์ฝ๋ฐฑ ์คํ
allOf() : ์ฌ๋ฌ ์์ ์ ๋ชจ๋ ์คํํ๊ณ ๋ชจ๋ ์์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
anyOf() : ์ฌ๋ฌ ์์ ์ค์ ๊ฐ์ฅ ๋นจ๋ฆฌ ๋๋ ํ๋์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
4. ์์ธ์ฒ๋ฆฌ
exceptionally(Function)
handle(BiFunction)
Error๊ฐ ์์๋์ ์ ์์ ์ธ ์ข ๋ฃ์ผ๋ ๋ชจ๋ ํธ๋ค๋งํ ์ ์๋ ๋ฉ์๋์ด๋ค.
Last updated