CompletableFuture
Java์์์ Concurrent
๋ฉํฐ ํ๋ก์ธ์ฑ ( ProcessBuilder )
๋ฉํฐ ์ค๋ ๋ฉ ( Thread/Runnable )
๋์์ฑ(Concurrency) vs ๋ณ๋ ฌ์ฑ(Parallelism)
๋์์ฑ? ์ฑ๊ธ์ฝ์ด์์ ๋ฉํฐ ์ค๋ ๋๋ฅผ ๋์์ํค๊ธฐ ์ํ ๋ฐฉ์์ผ๋ก ๋ฉํฐ ํ์คํน์ ์ํด ์ฌ๋ฌ๊ฐ ์ค๋ ๋๊ฐ ๋ฒ๊ฐ์๊ฐ๋ฉด์ ์คํ. ํ๋ง๋๋ก ๋์์ ์คํ๋๋ ๊ฒ์ฒ๋ผ ๋ณด์ด๋ ๊ฒ.
๋ฉํฐ์ค๋ ๋๋ก ๋์์ฑ์ ๋ง์กฑ์ํฌ์ ์๋ ๊ฑฐ์ง ๋์์ฑ์ด ๋ฉํฐ์ค๋ ๋๋ ์๋. ์ฝํ๋ฆฐ์ ์ฝ๋ฃจํด์ ์ฑ๊ธ์ค๋ ๋๋ก ๋์์ฑ์ ๋ง์กฑ
๋ณ๋ ฌ์ฑ? 2๊ฐ์ด์์ task๊ฐ ์์๋ ๊ฐ task๊ฐ ๋ฌผ๋ฆฌ์ ์ธ ์๊ฐ์ผ๋ก ๋์์ ์คํ์ด ๊ฐ๋ฅ. ๋ฉํฐ์ฝ์ด์์ ๋ฉํฐ์ค๋ ๋๋ฅผ ๋์์ํค๋ ๋ฐฉ์์ผ๋ก ํ๊ฐ ์ด์์ ์ค๋ ๋๋ฅผ ํฌํจํ๋ ๊ฐ ์์ ๋ค์ด ๋ฌผ๋ฆฌ์ ์ธ ์๊ฐ์ผ๋ก ์์ ๋์์ ์ํํ๋ ๊ฒ. (์ด๋ ๋์ ์์ ์ ๋ฉํฐ์ฝ์ด๊ฐ ๋ ์๋ ๋คํธ์ํฌ๋ฅผ ์ด์ฉํ ๋ถ์ฐ์ปดํจํ ์ด ๋ ์ ์๋ค.)
๋ณ๋ ฌ์ฑ์ ๋ง์กฑํ๋ฉด ๋์์ฑ๋ ๋ง์กฑ, ๋์์ฑ์ ๋ง์กฑํ๋ค๊ณ ๋ณ๋ ฌ์ฑ ๋ง์กฑx
๋ฉํฐ ์ค๋ ๋
๋ฉํฐ ์ค๋ ๋ ์์ฑ
1. Thread ๋ฅผ ์์๋ฐ์ ํด๋์ค๋ก ์ ์
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
System.out.println("Hello");
}
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Hello");
}
}
2. Thread ์์ฑ์์ ํ๋ฆฌ๋ฏธํฐ๋ก Runnalbe ์ธํฐํ์ด์ค ๊ตฌํํ์ฌ ์์ฑ
//1
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Hello thread");
}
});
thread.start();
System.out.println("Hello");
}
//2
public static void main(String[] args) {
Thread thread = new Thread(() -> { System.out.println("thread hello")});
thread.start();
System.out.println("Hello");
}
์ค๋ ๋ ์ฃผ์ ๊ธฐ๋ฅ
ํ์ฌ ์ค๋ ๋ ๋ฉ์ถฐ๋๊ธฐ (sleep) : ๋ค๋ฅธ ์ค๋ ๋๊ฐ ์ฒ๋ฆฌํ๋๋ก ๊ธฐํ๋ฅผ ์ฃผ์ง๋ง ๋ฝ์ ๊ฑธ์ง ์์ ๋ฐ๋๋ฝ์ด ๊ฑธ๋ฆด ์ ์๋ค.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread hello"); }); thread.start(); System.out.println("main hello"); } //print main thread hello
๋ค๋ฅธ ์ค๋ ๋ ๊นจ์ฐ๊ธฐ (interrupted) : ๋ค๋ฅธ ์ค๋ ๋๋ฅผ ๊นจ์์ InterruptedException์ ๋ฐ์ ์ํจ๋ค. ์์ธ๊ฐ ๋ฐ์ํ์ ๋ ํ ์ผ์ ์ฝ๋ฉํ๊ธฐ ๋๋ฆ์ด๋ค.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { System.out.println("thread die"); return; } System.out.println("thread hello"); }); thread.start(); System.out.println("main"); thread.interrupt(); } // print main thread die
๋ง์ผ catch๊ตฌ๋ฌธ์ return ๋ฌธ์ด ์กด์ฌํ์ง ์๋๋ค๋ฉด ๊ทธ๋๋ก catch๋ธ๋ญ์ ๋น ์ ธ๋๊ฐ
thread hello
๋ฅผ ์ถ๋ ฅํ ๊ฒ์ด๋ค.๋ค๋ฅธ ์ค๋ ๋ ๊ธฐ๋ค๋ฆฌ๊ธฐ (join) : ๋ค๋ฅธ ์ค๋ ๋๊ฐ ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { System.out.println("thread die"); return; } System.out.println("thread hello"); }); thread.start(); System.out.println("main"); thread.join(); } //print main //3์ดํ thread hello
์ค๋ ๋์ ํ์ฌ ๋ฌธ์
์ค๋ ๋๊ฐ ๋ง์ ์ง๋ฉด ๋ง์ ์ง ์๋ก ์ฝ๋ฉ์ผ๋ก ๊ด๋ฆฌํ๊ธฐ๊ฐ ๋งค์ฐ ํ๋ค๋ค. ( Interrupt, join์ ๋ํ ์์ธ์ฒ๋ฆฌํ ์ฝ๋๊ฐ ๊ธฐํ๊ธ์์ ์ผ๋ก ๋ง์์ง๋ค.)
Executors

๊ณ ์์ค์ Concurrency ํ๋ก๊ทธ๋๋ฐ์ ์ง์ํ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก ์์ ๊ฐ์ ์ค๋ ๋ ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ค์ ์ด์ฉํ์ฌ ๊ตฌํ๋์ด์๋ค.
์ค๋ ๋๋ฅผ ๋ง๋ค๊ณ ๊ด๋ฆฌํ๋ ์์ ์ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ถ๋ฆฌํ์ฌ Executors์ ์์ํ ํํ์ด๋ค.
ํ๋ ์ผ
์ค๋ ๋ ๋ง๋ค๊ธฐ : ์ค๋ ๋ ํ์ ๋ง๋ค์ด ๊ด๋ฆฌ
ExecutorService executorService = new ThreadPoolExecutorr(core,max,idleTime,TimeUnit.SECONDS,new SynchronousQueue()) : ์ง์ ์ค๋ ๋๊ฐ์์ ์ ํด์๊ฐ, ์์ ํ๋ฑ์ ์ค์ ํ์ฌ ์์ฑ ํ ์ ์๋ค.
์ฝ์ด ์ค๋ ๋? ์ค๋ ๋๊ฐ ์์ฑ๋๊ณ ์ ํด์ํ์์๋ ์ ๊ฑฐ๋์ง ์๊ณ ์ ์ง๋๋ ์ต๋ ๊ฐ์
ExecutorService executorService = Executors.newSingleThreadExecutor() : ํ๊ฐ์ ์ค๋ ๋๋ฅผ ๊ฐ๋ ์ค๋ ๋ํ ์์ฑ
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
ExecutorService executorService = Executors.newCachedThreadPool() : ์ด๊ธฐ ์ค๋ ๋ ์๋ 0, ์ฝ์ด ์ค๋๋ ์๋ 0์ ๊ฐ์ง๊ณ ์ค๋ ๋ ๊ฐ์๋ณด๋ค ์์ ๊ฐ์๊ฐ ๋ง์ผ๋ฉด ์ ์ค๋ ๋๋ฅผ ๊ทธ๋ ์์ฑ์์ผ ์์ ์ํค๋ ์ค๋ ๋ํ๋ก ์ต๋ ์ค๋ ๋ ์๋ integer.MAX_VALUE. ์ค๋ ๋๊ฐ ์์ฑ๋๊ณ 60์ด๊ฐ ์์ ์ ์ํํ์ง ์์ผ๋ฉด ์ ๊ฑฐ๋๋ค.
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ExecutorService executorService = Executors.newFixedThreadPool(int nThreads) : ์ด๊ธฐ ์ค๋ ๋ ์๋ 0, ์ฝ์ด ์ค๋ ๋ ์๋ nThreads, ์ต๋ ์ค๋ ๋ ์๋ nThreads๋ฅผ ๊ฐ๋ ์ค๋ ๋ํ ์์ฑ. ์ค๋ ๋๊ฐ ์์ ์ ์ฒ๋ฆฌํ์ง ์๊ณ ๋๊ณ ์๋๋ผ๋ ์ค๋ ๋ ๊ฐ์๊ฐ ์ค์ด๋ค์ง ์๋๋ค.
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
ExecutorService executorService = Executors.newWorkStealingPool(int pallelism) : ForkJoinPool์ work stealing ๊ธฐ๋ฒ์ ๋ฉํฐํ๋ก์ธ์ค ํ ์์ฑ
์ค๋ ๋ ๊ด๋ฆฌ : ์ค๋ ๋ ์๋ช ์ฃผ๊ธฐ๋ฅผ ๊ด๋ฆฌ
public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { System.out.println("Thread hello"); }); executorService.shutdown(); }
ExecutorService๋ ์์ ์ ์คํํ๊ณ ๋๋ฉด ๋ค๋ฅธ ์์ ์ด ๋ค์ด์ฌ๋๊น์ง ๊ณ์ ๋๊ธฐํ๊ณ ์๊ธฐ ๋๋ฌธ์ ๋ช ์์ ์ผ๋ก ์ข ๋ฃ๋ฅผ ์์ผ์ค์ผ ํ๋ค.
์ข ๋ฃ ํจ์์๋ shutdown(),shutdownNow()๊ฐ ์๋๋ฐ shutdown()์
gracefulํ ์ข ๋ฃ
๋ผํ๋ฉฐ ๋ฐ๋์ ์ค๋ ๋์ ์์ ์ด ๋ง์น๊ณ ๋์ ์ข ๋ฃ๋ฅผ ํ๋ฉฐ shutdownNow()๋ ์์ ํํฉ์ ์๊ด์์ด ์ข ๋ฃํ๋ ๋ฉ์๋์ด๋ค.shutdown()์ voidํ์ ์ด๊ณ shutdownNow()๋ ๋ฐํ๊ฐ์ผ๋ก ๋ฏธ์ฒ๋ฆฌ๋ ์์ (Runnable) ๋ชฉ๋ก์ด๋ค.
์์ ์ฒ๋ฆฌ ๋ฐ ์คํ : ์ค๋ ๋๋ก ์คํํ ์์ ์ ์ ๊ณตํ ์ ์๋ API ์ ๊ณต
์์ ์คํ : submit, execute
์ค์ผ์ค๋ง : scheduleAtFixedRate()์ ๊ฐ์ ๋ฉ์๋
Callable / Future
Callable์ Runnable๊ณผ ๊ฑฐ์ ๋๋ถ๋ถ์ด ์ผ์นํ์ง๋ง returnํ์ ์ด ์กด์ฌํ๋ค๋ ์ ์ ์ฐจ์ด์ ์ด ์์ด ์ด๋ฅผ ์ด์ฉํด ์์ ์ด ๋๋ฌ์๋์ ๊ฐ์ ๋ฐํ๋ฐ์ ์ ์๋ค.
API
1. ์์
์์ฒญ (execute/submit)
execute : ๋ฆฌํดํ์ ์ด void๋ก ์์ ์ฒ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํด๋ฐ์ง ๋ชปํ๊ณ ์์ ์ฒ๋ฆฌ ๋์ค์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ์ค๋ ๋๊ฐ ์ข ๋ฃ๋๊ณ ํด๋น ์ค๋ ๋๋ ์ค๋ ๋ํ์์ ์ ๊ฑฐ
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //ํ์ฌ ์์ฑ๋ ์ค๋ ๋๊ฐฏ์๊ฐ ์ฝ์ด ๊ฐ์๋ณด๋ค ์๋ค๋ฉด if (addWorker(command, true)) //์๋ก ์ค๋ ๋๋ฅผ ๋ง๋ค์ด task๋ฅผ ์ฒ๋ฆฌํ๊ณ ๋ฆฌํด return; c = ctl.get(); } //๋์ด์ ์ฒ๋ฆฌํ ์ค๋ ๋๊ฐ ์์ ๊ฒฝ์ฐ ์์ปคํ์ task๋ฅผ ๋ฃ์ด์ฃผ๊ณ ๋ฆฌํด //์ค๋ ๋ ํ์ด ์ฃฝ์๊ฑฐ๋ thread๊ฐ ์๋์ง check if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
+) ctl : ์ค๋ ๋ํ์ ์ํ , ์กฐ๊ธ์ด๋ผ๋ ๋ ๋น ๋ฅธ ๊ณ์ฐ์ ์ํด ๋นํธ๋ก ๊ฐ์ ์ ์ฅํ๊ณ ๋นํธ ์ฐ์ฐ์ ํตํด ๋์
workerCount : ํ์ฌ ์ค๋ ๋ ๊ฐ์
Integer.SiZE - 3 ๋งํผ์ ๊ฐฏ์๊ฐ ์ต๋ ๊ฐฏ์๋ก ์ฝ 5์ต๊ฐ
runState : ์ค๋ ๋ํ์ ์ํ
RUNNING : ์๋ก์ด TASK๋ฅผ ๋ฐ๊ณ ํ์ ์ง์ด ๋ฃ์ ์ผ์ ํ๋ ์ํ
SHUTDOWN : ์๋ก์ด TASK๋ฅผ ๋ฐ์ง๋ง๊ณ ํ์์๋ TASK๋ฅผ ์ฒ๋ฆฌ
RUNNING -> SHUTDOWN : shutdown() ํธ์ถ์
STOP : ์๋ก์ด TASK๋ฅผ ๋ฐ์ง ์๊ณ ํ์์๋ TASK๋ ์ฒ๋ฆฌํ์ง ์์ ์ํ๋ก ํ์ฌ ์งํ์ค์ธ TASK์ INTERRUPT๋ฅผ ๊ฑด๋ค.
(RUNNING or SHUTDOWM) -> STOP : shutdownNow() ํธ์ถ์
TIDYING : ๋ชจ๋ TASK๋ ์๋ฉธ ๋์๊ณ , WORKER COUNT๋ 0. ์ด ์ํ๋ก ์ ์ด๋๋ ์ค๋ ๋๋ terminated() ๋ฉ์๋๋ฅผ ์คํ์์ผ ์ข ๋ฃํ ์์ ์ธ ์ํ
STOP -> TIDYING : ์ค๋ ๋ ํ์ด ๋น์์๋
TERMINATED : ๋ชจ๋ ์ค๋ ๋๊ฐ terminated()๋ ์ํ
submit : Futureํ์ ์ ๋ฐํํ๊ณ ์์ ์ฒ๋ฆฌ ๋์ค์ ์์ธ๊ฐ ๋ฐ์ํด๋ ์ค๋ ๋๋ ์ข ๋ฃ๋์ง ์๊ณ ๋ค์ ์์ ์ ์ํด ์ฌ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ์ ์ค๋ ๋์ ์์ฑ ์ค๋ฒํค๋๋ฅผ ์ค์ผ ์ ์๋ค.
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
2. ๊ฒฐ๊ณผ ๊ฐ์ ธ์ค๊ธฐ (get/ poll / take)
get
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(2000L); return "Hello"; }; Future<String> result = executorService.submit(hello); result.get(); executorService.shutdown(); }
get()ํจ์๋ blocking call ๋ฐฉ์์ด๋ผ ์ค๋ ๋์ ์์ ์ด ์์ง ๋๋์ง ์์๋ค๋ฉด ๊ธฐ๋ค๋ฆฐ ํ์ ๋ฆฌํด๊ฐ์ ๋ฐ์์ค๊ณ ๋ค์ ์ค์ ์คํํ๊ฒ ๋๋ค.
poll : ์๋ฃ๋ ์์ ์ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด ์ฆ์ null์ ๋ฆฌํด
poll(timeout, Timeunit) : ์๋ฃ๋ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด Timeout๊น์ง ๋ธ๋กํน
take : ์ผ๋ฃ๋ ์์ ์ Future๋ฅผ ๊ฐ์ ธ์ค๊ณ ์๋ฃ๋ ์์ ์ด ์๋ค๋ฉด ์์๋๊น์ง ๋ธ๋กํน
3. ์์
์ํ ํ์ธ (isDone())
์์ ์ด ๋๋ฌ๋ค๋ฉด true ๋ฐํ
4. ์์
์ทจ์ (cancel())
์ทจ์ ํ์ผ๋ฉด true, ๋ชปํ์ผ๋ฉด false ๋ฐํ
ํ๋ผ๋ฏธํฐ๋ก true๋ฅผ ์ฃผ๋ฉด ํ์ฌ ์งํ์ค์ธ ์ค๋ ๋๋ฅผ interrupt ํ๊ณ , false๋ฅผ ์ฃผ๋ฉด ์์ ์ด ๋๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ณ ์ทจ์ํ๋ค.
5. ์ฌ๋ฌ์์
๋์์ ์คํ (invokeAll())
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> world = () -> {
Thread.sleep(3000L);
return "World";
};
Callable<String> hi = () -> {
Thread.sleep(1000L);
return "Hi";
};
List<Future<String>> result = executorService.invokeAll(List.of(hello,world,hi));
for(Future<String> r : result){
System.out.println(r);
}
executorService.shutdown();
}
๊ฒฐ๊ณผ๋ฅผ Future List๋ก ๋ฐ์์ค๊ธฐ ๋๋ฌธ์ ๋์์ ์คํํ ์์ ์ค ์ ์ผ ์ค๋ ๊ฑธ๋ฆฌ๋ ์์ ์ ์๊ฐ๋งํผ ๊ฑธ๋ฆฌ๊ฒ ๋๋ค.
65. ๊ฐ์ฅ ๋จผ์ ์ข
๋ฃ๋ ์์
๋ฐํ๋ฐ๊ธฐ (invokeAny())
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> world = () -> {
Thread.sleep(3000L);
return "World";
};
Callable<String> hi = () -> {
Thread.sleep(1000L);
return "Hi";
};
String result = executorService.invokeAny(List.of(hello,world,hi));
executorService.shutdown();
}
๊ฐ์ฅ ๋จผ์ ๋๋ ์์ ์ ๊ฒฐ๊ณผ๋ฌผ์ ๋ฐํ๋ฐ๋ ๋ฉ์๋์ธ๋ฐ ์ด๋ ๋ฐํ์ blocking call์ด๊ธฐ ๋๋ฌธ์ Future์ด ์๋ ์ผ๋ฐ ๊ฐ์ฒดํ์ ์ ๋ฐํ๋ฐ๋๋ค.
Future์ ๋ฌธ์ ์
Future๋ฅผ ์ธ๋ถ์์ ์ทจ์์ํค๊ฑฐ๋, get()์ ํ์์์์ ์คํํ๋๋ฑ ์๋ฃ์์ ์ ์ํํ ์ ์๋ค.
get()์ด๋ผ๋ blocking call์ ์ฌ์ฉํ์ง ์๊ณ ์๋ ์์ ์ด ๋๋ฌ์๋ ์ฝ๋ฐฑ์ ์คํํ ์ ์๋ค.
์ฌ๋ฌ Future๋ฅผ ์กฐ๋ฆฝํ ์ ์๋ค.
์์ธ์ฒ๋ฆฌ์ฉ API๋ฅผ ์ ๊ณตํ์ง ์๋๋ค.
CompletableFuture
์๋ฐ์์ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ๊ฐ๋ฅ์ผํ๋ ์ธํฐํ์ด์ค
API
1. ๋น๋๊ธฐ๋ก ์์
์คํ
์ํ๋ ์ฐ๋ ๋ ํ์ ์ฌ์ฉํด์ ์คํํ ์ ์๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก๋ ForkJoinPool.commonPool()๋ก ThreadPoolExecutor,ExecutorService๋ฅผ ์ฌ์ฉํ ์๋ ์๋ค.
๊ธฐ์กด์ ThreadPoolExecutor๋ ์๋ก ๋ ๋ฆฝ์ ์ธ ์์ ์ ์ํด ์ค๊ณ๋์์ผ๋ฉฐ ์ ์ฌ์ ์ผ๋ก ์ฐจ๋จ๋๊ณ ๊ฑฐ์น ์์ ๋ ์ผ๋์ ๋๊ณ ์ค๊ณ๋์๋ค.
ForkJoinPool? ์ฌ๊ท์ ์ธ ๋ค์ค ์ค๋ ๋ ํ๋ก๊ทธ๋จ์์ ๋๊ธฐ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์ค๊ณ๋ ๊ฒ์ผ๋ก ๋ค๋ฅธ ์์ ์ ๋ํด ์ฌ๊ท์ ์ด๋ฉฐ ์ํธ ์์กด์ ์ผ๋ ์ฌ์ฉํ๋ ๊ฒ์ด ํจ๊ณผ์ ์ธ ์ค๋ ๋ ํ. ์ด๋ ๋ค๋ฅธ ์์ ์ ๊ธฐ๋ค๋ฆฌ๋๋ฐ ๋ ๋ง์ ์๊ฐ์ ์๋นํ๊ณ ์์์ ๋ญ๋นํ๊ฒ ๋๋ ๋ฌธ์ ๊ฐ ์๋ค.
ExecutorService executorService = Executors.newFixed(ThreadPool(4));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}, executorService).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
},executorService);
์ฝ๋ฐฑํจ์๋ค์ ๋๋ฒ์งธ ๋งค๊ฐ๋ณ์๋ก ์ค๋ ๋ ํ์ ์ ๋ฌํ๋ฉด ํด๋น ์ค๋ ๋ํ์ ์ด์ฉํ์ฌ ์์ ์ ์ํํ ์ ์๋ค.
runAsync() : ๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); }); future.get();
supplyAsync() : ๋ฆฌํด๊ฐ์ด ์๋ ๊ฒฝ์ฐ
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); System.out.println(future.get());
2. ์ฝ๋ฐฑ ์ ๊ณต
์ฝ๋ฐฑ ์์ฒด๋ฅผ ๋๋ค๋ฅธ ์ค๋ ๋์์ ์คํํ ์ ์๋ค.
thenApply(Function) : ๋ฆฌํด๊ฐ์ ๋ฐ์์ ๋ค๋ฅธ ๊ฐ์ผ๋ก ๋ฐ๊พธ๋ ์ฝ๋ฐฑ
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenApply((s) -> { System.out.println(Thread.currentThread().getName()); return s.toUpperCase(); }); System.out.println(future.get());
thenAccept(Consumer) : ๋ฆฌํด๊ฐ์ผ๋ก ๋ฆฌํด์์ด ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenAccept((s) -> { System.out.println(s + Thread.currentThread().getName()); }); future.get();
thenRun(Runnable) : ๋ฆฌํด๊ฐ์ ๋ฐ์ง ์๊ณ ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ๋ ์ฝ๋ฐฑ
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenRun(() -> { System.out.println(Thread.currentThread().getName()); }); future.get();
3. ์กฐํฉํ๊ธฐ
thenCompose() : ๋ ์์ ์ด ์๋ก ์ฐ๊ด๊ด๊ณ๊ฐ ์์ด ์ด์ด์ ์คํํ๋๋ก ์กฐํฉ
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> future = hello.thenCompose(Example::getWorld); System.out.println(future.get()); } private static CompletableFuture<String> getWorld(String message) { return CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return message + "World"; }); }
thenApply์ ๊ฑฐ์ ๋น์ทํ๋ค๊ณ ํ ์ ์๋๋ฐ ์ฐจ์ด์ ์ด๋ผ๊ณ ํ๋ค๋ฉด thenApply๋ ๋ฐํ๊ฐ์ ์ด์ฉํด ์๋ก์ด ์ฝ๋ฐฑ์ ์คํํ๋ค๋ฉด, thenCompose๋ ๋ฐํ๊ฐ์ด ์๋๋ผ ์ด์ ์ ์ฒ๋ฆฌ ๋ก์ง ์์ฒด๋ฅผ ์ธ์๋ก ์๋ก์ด ์ฝ๋ฐฑ์ ์คํํ๋ ๊ฒ. ํ๋ง๋๋ก Future๋ฅผ ์ค์ฒฉํ๋ ๊ฒ์ด ์๋๋ผ ํ๋ฉดํํ์ฌ ๋ฐํํ๋ ๊ฒ์ด๋ค. ๊ทธ๋์ CompletableFuture ๋ฉ์๋๋ฅผ ์ฐ๊ฒฐํ๋ ๊ฒ์ด๋ผ๋ฉด thenComposer๊ฐ ์ข๋ค.
thenCombine() : ๋ ์์ ์ ๋ ๋ฆฝ์ ์ผ๋ก ์คํํ๊ณ ๋ค ์ข ๋ฃ ํ์๋ ์ฝ๋ฐฑ ์คํ
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<String> future = hello.thenCombine(world, (helloReturn,worldReturn) -> { return helloReturn + " " + worldReturn; }); System.out.println(future.get()); }
allOf() : ์ฌ๋ฌ ์์ ์ ๋ชจ๋ ์คํํ๊ณ ๋ชจ๋ ์์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); List<CompletableFuture<String>> futrues = List.of(hello,world); CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]); CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); results.get().forEach(System.out.println);
anyOf() : ์ฌ๋ฌ ์์ ์ค์ ๊ฐ์ฅ ๋นจ๋ฆฌ ๋๋ ํ๋์ ๊ฒฐ๊ณผ์ ์ฝ๋ฐฑ ์คํ
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<Void> future = CompletableFuture.anyOf(hello,world).thenAccept(System.out::println);
4. ์์ธ์ฒ๋ฆฌ
exceptionally(Function)
boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> { if(throwError){ throw new IllegalArgumentException(); } System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).exceptionally(ex -> { System.out.println(ex); return "Error"; });
handle(BiFunction)
boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> { if(throwError){ throw new IllegalArgumentException(); } System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).handle((result,ex) -> { if(ex != null) { System.out.println(ex); return "Error"; } return result; });
Error๊ฐ ์์๋์ ์ ์์ ์ธ ์ข ๋ฃ์ผ๋ ๋ชจ๋ ํธ๋ค๋งํ ์ ์๋ ๋ฉ์๋์ด๋ค.
Last updated