CompletableFuture
Javaμμμ Concurrent
λ©ν° νλ‘μΈμ± ( ProcessBuilder )
λ©ν° μ€λ λ© ( Thread/Runnable )
λμμ±(Concurrency) vs λ³λ ¬μ±(Parallelism)
λμμ±? μ±κΈμ½μ΄μμ λ©ν° μ€λ λλ₯Ό λμμν€κΈ° μν λ°©μμΌλ‘ λ©ν° νμ€νΉμ μν΄ μ¬λ¬κ° μ€λ λκ° λ²κ°μκ°λ©΄μ μ€ν. νλ§λλ‘ λμμ μ€νλλ κ²μ²λΌ 보μ΄λ κ².
λ©ν°μ€λ λλ‘ λμμ±μ λ§μ‘±μν¬μ μλ κ±°μ§ λμμ±μ΄ λ©ν°μ€λ λλ μλ. μ½νλ¦°μ μ½λ£¨ν΄μ μ±κΈμ€λ λλ‘ λμμ±μ λ§μ‘±
λ³λ ¬μ±? 2κ°μ΄μμ taskκ° μμλ κ° taskκ° λ¬Όλ¦¬μ μΈ μκ°μΌλ‘ λμμ μ€νμ΄ κ°λ₯. λ©ν°μ½μ΄μμ λ©ν°μ€λ λλ₯Ό λμμν€λ λ°©μμΌλ‘ νκ° μ΄μμ μ€λ λλ₯Ό ν¬ν¨νλ κ° μμ λ€μ΄ 물리μ μΈ μκ°μΌλ‘ μμ λμμ μννλ κ². (μ΄λ λμ μμ μ λ©ν°μ½μ΄κ° λ μλ λ€νΈμν¬λ₯Ό μ΄μ©ν λΆμ°μ»΄ν¨ν μ΄ λ μ μλ€.)
λ³λ ¬μ±μ λ§μ‘±νλ©΄ λμμ±λ λ§μ‘±, λμμ±μ λ§μ‘±νλ€κ³ λ³λ ¬μ± λ§μ‘±x
λ©ν° μ€λ λ
λ©ν° μ€λ λ μμ±
1. Thread λ₯Ό μμλ°μ ν΄λμ€λ‘ μ μ
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
System.out.println("Hello");
}
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Hello");
}
}
2. Thread μμ±μμ ν리미ν°λ‘ Runnalbe μΈν°νμ΄μ€ ꡬννμ¬ μμ±
//1
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Hello thread");
}
});
thread.start();
System.out.println("Hello");
}
//2
public static void main(String[] args) {
Thread thread = new Thread(() -> { System.out.println("thread hello")});
thread.start();
System.out.println("Hello");
}
μ€λ λ μ£Όμ κΈ°λ₯
νμ¬ μ€λ λ λ©μΆ°λκΈ° (sleep) : λ€λ₯Έ μ€λ λκ° μ²λ¦¬νλλ‘ κΈ°νλ₯Ό μ£Όμ§λ§ λ½μ κ±Έμ§ μμ λ°λλ½μ΄ 걸릴 μ μλ€.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread hello"); }); thread.start(); System.out.println("main hello"); } //print main thread hello
λ€λ₯Έ μ€λ λ κΉ¨μ°κΈ° (interrupted) : λ€λ₯Έ μ€λ λλ₯Ό κΉ¨μμ InterruptedExceptionμ λ°μ μν¨λ€. μμΈκ° λ°μνμ λ ν μΌμ μ½λ©νκΈ° λλ¦μ΄λ€.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { System.out.println("thread die"); return; } System.out.println("thread hello"); }); thread.start(); System.out.println("main"); thread.interrupt(); } // print main thread die
λ§μΌ catchꡬ문μ return λ¬Έμ΄ μ‘΄μ¬νμ§ μλλ€λ©΄ κ·Έλλ‘ catchλΈλμ λΉ μ Έλκ°
thread hello
λ₯Ό μΆλ ₯ν κ²μ΄λ€.λ€λ₯Έ μ€λ λ κΈ°λ€λ¦¬κΈ° (join) : λ€λ₯Έ μ€λ λκ° λλ λκΉμ§ κΈ°λ€λ¦°λ€.
public static void main(String[] args) throws ExecutionException, InterruptedException { Thread thread = new Thread(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { System.out.println("thread die"); return; } System.out.println("thread hello"); }); thread.start(); System.out.println("main"); thread.join(); } //print main //3μ΄ν thread hello
μ€λ λμ νμ¬ λ¬Έμ
μ€λ λκ° λ§μ μ§λ©΄ λ§μ μ§ μλ‘ μ½λ©μΌλ‘ κ΄λ¦¬νκΈ°κ° λ§€μ° νλ€λ€. ( Interrupt, joinμ λν μμΈμ²λ¦¬ν μ½λκ° κΈ°νκΈμμ μΌλ‘ λ§μμ§λ€.)
Executors

κ³ μμ€μ Concurrency νλ‘κ·Έλλ°μ μ§μνλ λΌμ΄λΈλ¬λ¦¬λ‘ μμ κ°μ μ€λ λ ν λΌμ΄λΈλ¬λ¦¬ λ€μ μ΄μ©νμ¬ κ΅¬νλμ΄μλ€.
μ€λ λλ₯Ό λ§λ€κ³ κ΄λ¦¬νλ μμ μ μ ν리μΌμ΄μ μμ λΆλ¦¬νμ¬ Executorsμ μμν ννμ΄λ€.
νλ μΌ
μ€λ λ λ§λ€κΈ° : μ€λ λ νμ λ§λ€μ΄ κ΄λ¦¬
ExecutorService executorService = new ThreadPoolExecutorr(core,max,idleTime,TimeUnit.SECONDS,new SynchronousQueue()) : μ§μ μ€λ λκ°μμ μ ν΄μκ°, μμ νλ±μ μ€μ νμ¬ μμ± ν μ μλ€.
μ½μ΄ μ€λ λ? μ€λ λκ° μμ±λκ³ μ ν΄μνμμλ μ κ±°λμ§ μκ³ μ μ§λλ μ΅λ κ°μ
ExecutorService executorService = Executors.newSingleThreadExecutor() : νκ°μ μ€λ λλ₯Ό κ°λ μ€λ λν μμ±
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
ExecutorService executorService = Executors.newCachedThreadPool() : μ΄κΈ° μ€λ λ μλ 0, μ½μ΄ μ€λλ μλ 0μ κ°μ§κ³ μ€λ λ κ°μλ³΄λ€ μμ κ°μκ° λ§μΌλ©΄ μ μ€λ λλ₯Ό κ·Έλ μμ±μμΌ μμ μν€λ μ€λ λνλ‘ μ΅λ μ€λ λ μλ integer.MAX_VALUE. μ€λ λκ° μμ±λκ³ 60μ΄κ° μμ μ μννμ§ μμΌλ©΄ μ κ±°λλ€.
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ExecutorService executorService = Executors.newFixedThreadPool(int nThreads) : μ΄κΈ° μ€λ λ μλ 0, μ½μ΄ μ€λ λ μλ nThreads, μ΅λ μ€λ λ μλ nThreadsλ₯Ό κ°λ μ€λ λν μμ±. μ€λ λκ° μμ μ μ²λ¦¬νμ§ μκ³ λκ³ μλλΌλ μ€λ λ κ°μκ° μ€μ΄λ€μ§ μλλ€.
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
ExecutorService executorService = Executors.newWorkStealingPool(int pallelism) : ForkJoinPoolμ work stealing κΈ°λ²μ λ©ν°νλ‘μΈμ€ ν μμ±
μ€λ λ κ΄λ¦¬ : μ€λ λ μλͺ μ£ΌκΈ°λ₯Ό κ΄λ¦¬
public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { System.out.println("Thread hello"); }); executorService.shutdown(); }
ExecutorServiceλ μμ μ μ€ννκ³ λλ©΄ λ€λ₯Έ μμ μ΄ λ€μ΄μ¬λκΉμ§ κ³μ λκΈ°νκ³ μκΈ° λλ¬Έμ λͺ μμ μΌλ‘ μ’ λ£λ₯Ό μμΌμ€μΌ νλ€.
μ’ λ£ ν¨μμλ shutdown(),shutdownNow()κ° μλλ° shutdown()μ
gracefulν μ’ λ£
λΌνλ©° λ°λμ μ€λ λμ μμ μ΄ λ§μΉκ³ λμ μ’ λ£λ₯Ό νλ©° shutdownNow()λ μμ νν©μ μκ΄μμ΄ μ’ λ£νλ λ©μλμ΄λ€.shutdown()μ voidνμ μ΄κ³ shutdownNow()λ λ°νκ°μΌλ‘ λ―Έμ²λ¦¬λ μμ (Runnable) λͺ©λ‘μ΄λ€.
μμ μ²λ¦¬ λ° μ€ν : μ€λ λλ‘ μ€νν μμ μ μ 곡ν μ μλ API μ 곡
μμ μ€ν : submit, execute
μ€μΌμ€λ§ : scheduleAtFixedRate()μ κ°μ λ©μλ
Callable / Future
Callableμ Runnableκ³Ό κ±°μ λλΆλΆμ΄ μΌμΉνμ§λ§ returnνμ μ΄ μ‘΄μ¬νλ€λ μ μ μ°¨μ΄μ μ΄ μμ΄ μ΄λ₯Ό μ΄μ©ν΄ μμ μ΄ λλ¬μλμ κ°μ λ°νλ°μ μ μλ€.
API
1. μμ
μμ² (execute/submit)
execute : 리ν΄νμ μ΄ voidλ‘ μμ μ²λ¦¬ κ²°κ³Όλ₯Ό 리ν΄λ°μ§ λͺ»νκ³ μμ μ²λ¦¬ λμ€μ μμΈκ° λ°μνλ©΄ μ€λ λκ° μ’ λ£λκ³ ν΄λΉ μ€λ λλ μ€λ λνμμ μ κ±°
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //νμ¬ μμ±λ μ€λ λκ°―μκ° μ½μ΄ κ°μλ³΄λ€ μλ€λ©΄ if (addWorker(command, true)) //μλ‘ μ€λ λλ₯Ό λ§λ€μ΄ taskλ₯Ό μ²λ¦¬νκ³ λ¦¬ν΄ return; c = ctl.get(); } //λμ΄μ μ²λ¦¬ν μ€λ λκ° μμ κ²½μ° μ컀νμ taskλ₯Ό λ£μ΄μ£Όκ³ λ¦¬ν΄ //μ€λ λ νμ΄ μ£½μκ±°λ threadκ° μλμ§ check if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
+) ctl : μ€λ λνμ μν , μ‘°κΈμ΄λΌλ λ λΉ λ₯Έ κ³μ°μ μν΄ λΉνΈλ‘ κ°μ μ μ₯νκ³ λΉνΈ μ°μ°μ ν΅ν΄ λμ
workerCount : νμ¬ μ€λ λ κ°μ
Integer.SiZE - 3 λ§νΌμ κ°―μκ° μ΅λ κ°―μλ‘ μ½ 5μ΅κ°
runState : μ€λ λνμ μν
RUNNING : μλ‘μ΄ TASKλ₯Ό λ°κ³ νμ μ§μ΄ λ£μ μΌμ νλ μν
SHUTDOWN : μλ‘μ΄ TASKλ₯Ό λ°μ§λ§κ³ νμμλ TASKλ₯Ό μ²λ¦¬
RUNNING -> SHUTDOWN : shutdown() νΈμΆμ
STOP : μλ‘μ΄ TASKλ₯Ό λ°μ§ μκ³ νμμλ TASKλ μ²λ¦¬νμ§ μμ μνλ‘ νμ¬ μ§νμ€μΈ TASKμ INTERRUPTλ₯Ό 건λ€.
(RUNNING or SHUTDOWM) -> STOP : shutdownNow() νΈμΆμ
TIDYING : λͺ¨λ TASKλ μλ©Έ λμκ³ , WORKER COUNTλ 0. μ΄ μνλ‘ μ μ΄λλ μ€λ λλ terminated() λ©μλλ₯Ό μ€νμμΌ μ’ λ£ν μμ μΈ μν
STOP -> TIDYING : μ€λ λ νμ΄ λΉμμλ
TERMINATED : λͺ¨λ μ€λ λκ° terminated()λ μν
submit : Futureνμ μ λ°ννκ³ μμ μ²λ¦¬ λμ€μ μμΈκ° λ°μν΄λ μ€λ λλ μ’ λ£λμ§ μκ³ λ€μ μμ μ μν΄ μ¬μ¬μ©λκΈ° λλ¬Έμ μ€λ λμ μμ± μ€λ²ν€λλ₯Ό μ€μΌ μ μλ€.
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
2. κ²°κ³Ό κ°μ Έμ€κΈ° (get/ poll / take)
get
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Callable<String> hello = () -> { Thread.sleep(2000L); return "Hello"; }; Future<String> result = executorService.submit(hello); result.get(); executorService.shutdown(); }
get()ν¨μλ blocking call λ°©μμ΄λΌ μ€λ λμ μμ μ΄ μμ§ λλμ§ μμλ€λ©΄ κΈ°λ€λ¦° νμ 리ν΄κ°μ λ°μμ€κ³ λ€μ μ€μ μ€ννκ² λλ€.
poll : μλ£λ μμ μ Futureλ₯Ό κ°μ Έμ€κ³ μλ£λ μμ μ΄ μλ€λ©΄ μ¦μ nullμ 리ν΄
poll(timeout, Timeunit) : μλ£λ Futureλ₯Ό κ°μ Έμ€κ³ μλ£λ μμ μ΄ μλ€λ©΄ TimeoutκΉμ§ λΈλ‘νΉ
take : μΌλ£λ μμ μ Futureλ₯Ό κ°μ Έμ€κ³ μλ£λ μμ μ΄ μλ€λ©΄ μμλκΉμ§ λΈλ‘νΉ
3. μμ
μν νμΈ (isDone())
μμ μ΄ λλ¬λ€λ©΄ true λ°ν
4. μμ
μ·¨μ (cancel())
μ·¨μ νμΌλ©΄ true, λͺ»νμΌλ©΄ false λ°ν
νλΌλ―Έν°λ‘ trueλ₯Ό μ£Όλ©΄ νμ¬ μ§νμ€μΈ μ€λ λλ₯Ό interrupt νκ³ , falseλ₯Ό μ£Όλ©΄ μμ μ΄ λλ λκΉμ§ κΈ°λ€λ¦¬κ³ μ·¨μνλ€.
5. μ¬λ¬μμ
λμμ μ€ν (invokeAll())
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> world = () -> {
Thread.sleep(3000L);
return "World";
};
Callable<String> hi = () -> {
Thread.sleep(1000L);
return "Hi";
};
List<Future<String>> result = executorService.invokeAll(List.of(hello,world,hi));
for(Future<String> r : result){
System.out.println(r);
}
executorService.shutdown();
}
κ²°κ³Όλ₯Ό Future Listλ‘ λ°μμ€κΈ° λλ¬Έμ λμμ μ€νν μμ μ€ μ μΌ μ€λ 걸리λ μμ μ μκ°λ§νΌ κ±Έλ¦¬κ² λλ€.
65. κ°μ₯ λ¨Όμ μ’
λ£λ μμ
λ°νλ°κΈ° (invokeAny())
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> world = () -> {
Thread.sleep(3000L);
return "World";
};
Callable<String> hi = () -> {
Thread.sleep(1000L);
return "Hi";
};
String result = executorService.invokeAny(List.of(hello,world,hi));
executorService.shutdown();
}
κ°μ₯ λ¨Όμ λλ μμ μ κ²°κ³Όλ¬Όμ λ°νλ°λ λ©μλμΈλ° μ΄λ λ°νμ blocking callμ΄κΈ° λλ¬Έμ Futureμ΄ μλ μΌλ° κ°μ²΄νμ μ λ°νλ°λλ€.
Futureμ λ¬Έμ μ
Futureλ₯Ό μΈλΆμμ μ·¨μμν€κ±°λ, get()μ νμμμμ μ€ννλλ± μλ£μμ μ μνν μ μλ€.
get()μ΄λΌλ blocking callμ μ¬μ©νμ§ μκ³ μλ μμ μ΄ λλ¬μλ μ½λ°±μ μ€νν μ μλ€.
μ¬λ¬ Futureλ₯Ό 쑰립ν μ μλ€.
μμΈμ²λ¦¬μ© APIλ₯Ό μ 곡νμ§ μλλ€.
CompletableFuture
μλ°μμ λΉλκΈ° νλ‘κ·Έλλ°μ κ°λ₯μΌνλ μΈν°νμ΄μ€
API
1. λΉλκΈ°λ‘ μμ
μ€ν
μνλ μ°λ λ νμ μ¬μ©ν΄μ μ€νν μ μλ€. κΈ°λ³Έμ μΌλ‘λ ForkJoinPool.commonPool()λ‘ ThreadPoolExecutor,ExecutorServiceλ₯Ό μ¬μ©ν μλ μλ€.
κΈ°μ‘΄μ ThreadPoolExecutorλ μλ‘ λ 립μ μΈ μμ μ μν΄ μ€κ³λμμΌλ©° μ μ¬μ μΌλ‘ μ°¨λ¨λκ³ κ±°μΉ μμ λ μΌλμ λκ³ μ€κ³λμλ€.
ForkJoinPool? μ¬κ·μ μΈ λ€μ€ μ€λ λ νλ‘κ·Έλ¨μμ λκΈ° λ¬Έμ λ₯Ό ν΄κ²°νκΈ° μν΄ μ€κ³λ κ²μΌλ‘ λ€λ₯Έ μμ μ λν΄ μ¬κ·μ μ΄λ©° μνΈ μμ‘΄μ μΌλ μ¬μ©νλ κ²μ΄ ν¨κ³Όμ μΈ μ€λ λ ν. μ΄λ λ€λ₯Έ μμ μ κΈ°λ€λ¦¬λλ° λ λ§μ μκ°μ μλΉνκ³ μμμ λλΉνκ² λλ λ¬Έμ κ° μλ€.
ExecutorService executorService = Executors.newFixed(ThreadPool(4));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
}, executorService).thenApply((s) -> {
System.out.println(Thread.currentThread().getName());
return s.toUpperCase();
},executorService);
μ½λ°±ν¨μλ€μ λλ²μ§Έ λ§€κ°λ³μλ‘ μ€λ λ νμ μ λ¬νλ©΄ ν΄λΉ μ€λ λνμ μ΄μ©νμ¬ μμ μ μνν μ μλ€.
runAsync() : 리ν΄κ°μ΄ μλ κ²½μ°
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); }); future.get();
supplyAsync() : 리ν΄κ°μ΄ μλ κ²½μ°
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); System.out.println(future.get());
2. μ½λ°± μ 곡
μ½λ°± μ체λ₯Ό λλ€λ₯Έ μ€λ λμμ μ€νν μ μλ€.
thenApply(Function) : 리ν΄κ°μ λ°μμ λ€λ₯Έ κ°μΌλ‘ λ°κΎΈλ μ½λ°±
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenApply((s) -> { System.out.println(Thread.currentThread().getName()); return s.toUpperCase(); }); System.out.println(future.get());
thenAccept(Consumer) : 리ν΄κ°μΌλ‘ 리ν΄μμ΄ λ€λ₯Έ μμ μ μ²λ¦¬νλ μ½λ°±
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenAccept((s) -> { System.out.println(s + Thread.currentThread().getName()); }); future.get();
thenRun(Runnable) : 리ν΄κ°μ λ°μ§ μκ³ λ€λ₯Έ μμ μ μ²λ¦¬νλ μ½λ°±
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).thenRun(() -> { System.out.println(Thread.currentThread().getName()); }); future.get();
3. μ‘°ν©νκΈ°
thenCompose() : λ μμ μ΄ μλ‘ μ°κ΄κ΄κ³κ° μμ΄ μ΄μ΄μ μ€ννλλ‘ μ‘°ν©
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> future = hello.thenCompose(Example::getWorld); System.out.println(future.get()); } private static CompletableFuture<String> getWorld(String message) { return CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return message + "World"; }); }
thenApplyμ κ±°μ λΉμ·νλ€κ³ ν μ μλλ° μ°¨μ΄μ μ΄λΌκ³ νλ€λ©΄ thenApplyλ λ°νκ°μ μ΄μ©ν΄ μλ‘μ΄ μ½λ°±μ μ€ννλ€λ©΄, thenComposeλ λ°νκ°μ΄ μλλΌ μ΄μ μ μ²λ¦¬ λ‘μ§ μ체λ₯Ό μΈμλ‘ μλ‘μ΄ μ½λ°±μ μ€ννλ κ². νλ§λλ‘ Futureλ₯Ό μ€μ²©νλ κ²μ΄ μλλΌ νλ©΄ννμ¬ λ°ννλ κ²μ΄λ€. κ·Έλμ CompletableFuture λ©μλλ₯Ό μ°κ²°νλ κ²μ΄λΌλ©΄ thenComposerκ° μ’λ€.
thenCombine() : λ μμ μ λ 립μ μΌλ‘ μ€ννκ³ λ€ μ’ λ£ νμλ μ½λ°± μ€ν
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<String> future = hello.thenCombine(world, (helloReturn,worldReturn) -> { return helloReturn + " " + worldReturn; }); System.out.println(future.get()); }
allOf() : μ¬λ¬ μμ μ λͺ¨λ μ€ννκ³ λͺ¨λ μμ κ²°κ³Όμ μ½λ°± μ€ν
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); List<CompletableFuture<String>> futrues = List.of(hello,world); CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]); CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); results.get().forEach(System.out.println);
anyOf() : μ¬λ¬ μμ μ€μ κ°μ₯ 빨리 λλ νλμ κ²°κ³Όμ μ½λ°± μ€ν
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }); CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName()); return "World"; }); CompletableFuture<Void> future = CompletableFuture.anyOf(hello,world).thenAccept(System.out::println);
4. μμΈμ²λ¦¬
exceptionally(Function)
boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> { if(throwError){ throw new IllegalArgumentException(); } System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).exceptionally(ex -> { System.out.println(ex); return "Error"; });
handle(BiFunction)
boolean throwError = true; CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> { if(throwError){ throw new IllegalArgumentException(); } System.out.println("Hello " + Thread.currentThread().getName()); return "Hello"; }).handle((result,ex) -> { if(ex != null) { System.out.println(ex); return "Error"; } return result; });
Errorκ° μμλμ μ μμ μΈ μ’ λ£μΌλ λͺ¨λ νΈλ€λ§ν μ μλ λ©μλμ΄λ€.
Last updated