CompletableFuture

Javaμ—μ„œμ˜ Concurrent

  1. λ©€ν‹° ν”„λ‘œμ„Έμ‹± ( ProcessBuilder )

  2. λ©€ν‹° μŠ€λ ˆλ”© ( Thread/Runnable )

λ™μ‹œμ„±(Concurrency) vs 병렬성(Parallelism)

λ™μ‹œμ„±? μ‹±κΈ€μ½”μ–΄μ—μ„œ λ©€ν‹° μŠ€λ ˆλ“œλ₯Ό λ™μž‘μ‹œν‚€κΈ° μœ„ν•œ λ°©μ‹μœΌλ‘œ λ©€ν‹° νƒœμŠ€ν‚Ήμ„ μœ„ν•΄ μ—¬λŸ¬κ°œ μŠ€λ ˆλ“œκ°€ λ²ˆκ°ˆμ•„κ°€λ©΄μ„œ μ‹€ν–‰. ν•œλ§ˆλ””λ‘œ λ™μ‹œμ— μ‹€ν–‰λ˜λŠ” κ²ƒμ²˜λŸΌ λ³΄μ΄λŠ” 것.

λ©€ν‹°μŠ€λ ˆλ“œλ‘œ λ™μ‹œμ„±μ„ λ§Œμ‘±μ‹œν‚¬μˆ˜ μžˆλŠ” κ±°μ§€ λ™μ‹œμ„±μ΄ λ©€ν‹°μŠ€λ ˆλ“œλŠ” μ•„λ‹˜. μ½”ν‹€λ¦°μ˜ 코루틴은 μ‹±κΈ€μŠ€λ ˆλ“œλ‘œ λ™μ‹œμ„±μ„ 만쑱

병렬성? 2κ°œμ΄μƒμ˜ taskκ°€ μžˆμ„λ•Œ 각 taskκ°€ 물리적인 μ‹œκ°„μœΌλ‘œ λ™μ‹œμ— 싀행이 κ°€λŠ₯. λ©€ν‹°μ½”μ–΄μ—μ„œ λ©€ν‹°μŠ€λ ˆλ“œλ₯Ό λ™μž‘μ‹œν‚€λŠ” λ°©μ‹μœΌλ‘œ ν•œκ°œ μ΄μƒμ˜ μŠ€λ ˆλ“œλ₯Ό ν¬ν•¨ν•˜λŠ” 각 μž‘μ—…λ“€μ΄ 물리적인 μ‹œκ°„μœΌλ‘œ μ™„μ „ λ™μ‹œμ— μˆ˜ν–‰ν•˜λŠ” 것. (μ΄λ•Œ λ™μ‹œ μž‘μ—…μ€ λ©€ν‹°μ½”μ–΄κ°€ λ μˆ˜λ„ λ„€νŠΈμ›Œν¬λ₯Ό μ΄μš©ν•œ λΆ„μ‚°μ»΄ν“¨νŒ…μ΄ 될 수 μžˆλ‹€.)

병렬성을 λ§Œμ‘±ν•˜λ©΄ λ™μ‹œμ„±λ„ 만쑱, λ™μ‹œμ„±μ„ λ§Œμ‘±ν•œλ‹€κ³  병렬성 만쑱x

λ©€ν‹° μŠ€λ ˆλ“œ

λ©€ν‹° μŠ€λ ˆλ“œ 생성

1. Thread λ₯Ό 상속받아 클래슀둜 μ •μ˜

public static void main(String[] args) {
    MyThread thread = new MyThread();
    thread.start();

    System.out.println("Hello");
}

static class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Hello");
    }
}

2. Thread μƒμ„±μžμ— νŒŒλ¦¬λ―Έν„°λ‘œ Runnalbe μΈν„°νŽ˜μ΄μŠ€ κ΅¬ν˜„ν•˜μ—¬ 생성

//1
public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
            System.out.println("Hello thread");

            }
        });
        thread.start();

        System.out.println("Hello");
}

//2
public static void main(String[] args) {
        Thread thread = new Thread(() -> { System.out.println("thread hello")});
        thread.start();

        System.out.println("Hello");
}

μŠ€λ ˆλ“œ μ£Όμš” κΈ°λŠ₯

  • ν˜„μž¬ μŠ€λ ˆλ“œ λ©ˆμΆ°λ‘κΈ° (sleep) : λ‹€λ₯Έ μŠ€λ ˆλ“œκ°€ μ²˜λ¦¬ν•˜λ„λ‘ 기회λ₯Ό μ£Όμ§€λ§Œ 락을 κ±Έμ§„ μ•Šμ•„ λ°λ“œλ½μ΄ 걸릴 수 μžˆλ‹€.

      public static void main(String[] args) throws ExecutionException, InterruptedException {
          Thread thread = new Thread(() -> {
              try {
                  Thread.sleep(1000L);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("thread hello");
          });
          thread.start();
          System.out.println("main hello");
      }
    
      //print
      main
      thread hello
  • λ‹€λ₯Έ μŠ€λ ˆλ“œ 깨우기 (interrupted) : λ‹€λ₯Έ μŠ€λ ˆλ“œλ₯Ό κΉ¨μ›Œμ„œ InterruptedException을 λ°œμƒ μ‹œν‚¨λ‹€. μ˜ˆμ™Έκ°€ λ°œμƒν–ˆμ„ λ•Œ ν•  일을 μ½”λ”©ν•˜κΈ° λ‚˜λ¦„μ΄λ‹€.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                System.out.println("thread die");
                return;
            }
            System.out.println("thread hello");
        });
        thread.start();
        System.out.println("main");
        thread.interrupt();
    }
    
    // print
    main
    thread die

    만일 catchꡬ문에 return 문이 μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ”λ‹€λ©΄ κ·ΈλŒ€λ‘œ catchλΈ”λŸ­μ„ λΉ μ Έλ‚˜κ°€ thread helloλ₯Ό 좜λ ₯ν•  것이닀.

  • λ‹€λ₯Έ μŠ€λ ˆλ“œ 기닀리기 (join) : λ‹€λ₯Έ μŠ€λ ˆλ“œκ°€ λλ‚ λ•ŒκΉŒμ§€ κΈ°λ‹€λ¦°λ‹€.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                System.out.println("thread die");
                return;
            }
            System.out.println("thread hello");
        });
        thread.start();
        System.out.println("main");
        thread.join();
    }
    
    //print
    main
    //3μ΄ˆν›„
    thread hello

μŠ€λ ˆλ“œμ˜ ν˜„μž¬ 문제

μŠ€λ ˆλ“œκ°€ λ§Žμ•„ μ§€λ©΄ λ§Žμ•„ 질 수둝 μ½”λ”©μœΌλ‘œ κ΄€λ¦¬ν•˜κΈ°κ°€ 맀우 νž˜λ“€λ‹€. ( Interrupt, join에 λŒ€ν•œ μ˜ˆμ™Έμ²˜λ¦¬ν•  μ½”λ“œκ°€ κΈ°ν•˜κΈ‰μˆ˜μ μœΌλ‘œ λ§Žμ•„μ§„λ‹€.)

Executors

threadpool

κ³ μˆ˜μ€€μ˜ Concurrency ν”„λ‘œκ·Έλž˜λ°μ„ μ§€μ›ν•˜λŠ” 라이브러리둜 μœ„μ™€ 같은 μŠ€λ ˆλ“œ ν’€ 라이브러리 듀을 μ΄μš©ν•˜μ—¬ κ΅¬ν˜„λ˜μ–΄μžˆλ‹€.

μŠ€λ ˆλ“œλ₯Ό λ§Œλ“€κ³  κ΄€λ¦¬ν•˜λŠ” μž‘μ—…μ„ μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ λΆ„λ¦¬ν•˜μ—¬ Executors에 μœ„μž„ν•œ ν˜•νƒœμ΄λ‹€.

ν•˜λŠ” 일

  • μŠ€λ ˆλ“œ λ§Œλ“€κΈ° : μŠ€λ ˆλ“œ 풀을 λ§Œλ“€μ–΄ 관리

    • ExecutorService executorService = new ThreadPoolExecutorr(core,max,idleTime,TimeUnit.SECONDS,new SynchronousQueue()) : 직접 μŠ€λ ˆλ“œκ°œμˆ˜μ™€ μœ νœ΄μ‹œκ°„, μž‘μ—…νλ“±μ„ μ„€μ •ν•˜μ—¬ 생성 ν•  수 μžˆλ‹€.

    μ½”μ–΄ μŠ€λ ˆλ“œ? μŠ€λ ˆλ“œκ°€ μƒμ„±λ˜κ³  μœ νœ΄μƒνƒœμ—μ„œλ„ μ œκ±°λ˜μ§€ μ•Šκ³  μœ μ§€λ˜λŠ” μ΅œλŒ€ 개수

    • ExecutorService executorService = Executors.newSingleThreadExecutor() : ν•œκ°œμ˜ μŠ€λ ˆλ“œλ₯Ό κ°–λŠ” μŠ€λ ˆλ“œν’€ 생성

      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory));
      }
    • ExecutorService executorService = Executors.newCachedThreadPool() : 초기 μŠ€λ ˆλ“œ μˆ˜λŠ” 0, μ½”μ–΄ μŠ€λž˜λ“œ μˆ˜λŠ” 0을 κ°€μ§€κ³  μŠ€λ ˆλ“œ κ°œμˆ˜λ³΄λ‹€ μž‘μ—… κ°œμˆ˜κ°€ 많으면 μƒˆ μŠ€λ ˆλ“œλ₯Ό κ·Έλ•Œ μƒμ„±μ‹œμΌœ μž‘μ—…μ‹œν‚€λŠ” μŠ€λ ˆλ“œν’€λ‘œ μ΅œλŒ€ μŠ€λ ˆλ“œ μˆ˜λŠ” integer.MAX_VALUE. μŠ€λ ˆλ“œκ°€ μƒμ„±λ˜κ³  60μ΄ˆκ°„ μž‘μ—…μ„ μˆ˜ν–‰ν•˜μ§€ μ•ŠμœΌλ©΄ μ œκ±°λœλ‹€.

      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
    • ExecutorService executorService = Executors.newFixedThreadPool(int nThreads) : 초기 μŠ€λ ˆλ“œ μˆ˜λŠ” 0, μ½”μ–΄ μŠ€λ ˆλ“œ μˆ˜λŠ” nThreads, μ΅œλŒ€ μŠ€λ ˆλ“œ μˆ˜λ„ nThreadsλ₯Ό κ°–λŠ” μŠ€λ ˆλ“œν’€ 생성. μŠ€λ ˆλ“œκ°€ μž‘μ—…μ„ μ²˜λ¦¬ν•˜μ§€ μ•Šκ³  λ†€κ³ μžˆλ”λΌλ„ μŠ€λ ˆλ“œ κ°œμˆ˜κ°€ 쀄어듀지 μ•ŠλŠ”λ‹€.

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory);
      }
    • ExecutorService executorService = Executors.newWorkStealingPool(int pallelism) : ForkJoinPool의 work stealing κΈ°λ²•μ˜ λ©€ν‹°ν”„λ‘œμ„ΈμŠ€ ν’€ 생성

  • μŠ€λ ˆλ“œ 관리 : μŠ€λ ˆλ“œ 생λͺ…μ£ΌκΈ°λ₯Ό 관리

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            System.out.println("Thread hello");
        });
    
        executorService.shutdown();
    }

    ExecutorServiceλŠ” μž‘μ—…μ„ μ‹€ν–‰ν•˜κ³ λ‚˜λ©΄ λ‹€λ₯Έ μž‘μ—…μ΄ λ“€μ–΄μ˜¬λ•ŒκΉŒμ§€ 계속 λŒ€κΈ°ν•˜κ³  있기 λ•Œλ¬Έμ— λͺ…μ‹œμ μœΌλ‘œ μ’…λ£Œλ₯Ό μ‹œμΌœμ€˜μ•Ό ν•œλ‹€.

    μ’…λ£Œ ν•¨μˆ˜μ—λŠ” shutdown(),shutdownNow()κ°€ μžˆλŠ”λ° shutdown()은 gracefulν•œ μ’…λ£ŒλΌν•˜λ©° λ°˜λ“œμ‹œ μŠ€λ ˆλ“œμ˜ μž‘μ—…μ΄ 마치고 λ‚˜μ„œ μ’…λ£Œλ₯Ό ν•˜λ©° shutdownNow()λŠ” μž‘μ—…ν˜„ν™©μ— 상관없이 μ’…λ£Œν•˜λŠ” λ©”μ„œλ“œμ΄λ‹€.

    shutdown()은 voidνƒ€μž…μ΄κ³  shutdownNow()λŠ” λ°˜ν™˜κ°’μœΌλ‘œ 미처리된 μž‘μ—…(Runnable) λͺ©λ‘μ΄λ‹€.

  • μž‘μ—… 처리 및 μ‹€ν–‰ : μŠ€λ ˆλ“œλ‘œ μ‹€ν–‰ν•  μž‘μ—…μ„ μ œκ³΅ν•  수 μžˆλŠ” API 제곡

    • μž‘μ—… μ‹€ν–‰ : submit, execute

    • μŠ€μΌ€μ€„λ§ : scheduleAtFixedRate()와 같은 λ©”μ„œλ“œ

Callable / Future

Callable은 Runnableκ³Ό 거의 λŒ€λΆ€λΆ„μ΄ μΌμΉ˜ν•˜μ§€λ§Œ returnνƒ€μž…μ΄ μ‘΄μž¬ν•œλ‹€λŠ” 점의 차이점이 μžˆμ–΄ 이λ₯Ό μ΄μš©ν•΄ μž‘μ—…μ΄ λλ‚¬μ„λ•Œμ˜ 값을 λ°˜ν™˜λ°›μ„ 수 μžˆλ‹€.

API

1. μž‘μ—… μš”μ²­ (execute/submit)

  • execute : λ¦¬ν„΄νƒ€μž…μ΄ void둜 μž‘μ—…μ²˜λ¦¬ κ²°κ³Όλ₯Ό 리턴받지 λͺ»ν•˜κ³  μž‘μ—…μ²˜λ¦¬ 도쀑에 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λ©΄ μŠ€λ ˆλ“œκ°€ μ’…λ£Œλ˜κ³  ν•΄λ‹Ή μŠ€λ ˆλ“œλŠ” μŠ€λ ˆλ“œν’€μ—μ„œ 제거

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();      
        if (workerCountOf(c) < corePoolSize) {  //ν˜„μž¬ μƒμ„±λœ μŠ€λ ˆλ“œκ°―μˆ˜κ°€  μ½”μ–΄ κ°œμˆ˜λ³΄λ‹€ μž‘λ‹€λ©΄
            if (addWorker(command, true))   //μƒˆλ‘œ μŠ€λ ˆλ“œλ₯Ό λ§Œλ“€μ–΄ taskλ₯Ό μ²˜λ¦¬ν•˜κ³  리턴
                return;
            c = ctl.get();
        }
    
        //더이상 μ²˜λ¦¬ν•  μŠ€λ ˆλ“œκ°€ 없을 경우 μ›Œμ»€νμ— taskλ₯Ό λ„£μ–΄μ£Όκ³  리턴
        //μŠ€λ ˆλ“œ 풀이 μ£½μ—ˆκ±°λ‚˜ threadκ°€ μ—†λŠ”μ§€ check
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    +) ctl : μŠ€λ ˆλ“œν’€μ˜ μƒνƒœ , μ‘°κΈˆμ΄λΌλ„ 더 λΉ λ₯Έ 계산을 μœ„ν•΄ λΉ„νŠΈλ‘œ 값을 μ €μž₯ν•˜κ³  λΉ„νŠΈ 연산을 톡해 λ™μž‘

    • workerCount : ν˜„μž¬ μŠ€λ ˆλ“œ 개수

      • Integer.SiZE - 3 만큼의 κ°―μˆ˜κ°€ μ΅œλŒ€ 갯수둜 μ•½ 5μ–΅κ°œ

    • runState : μŠ€λ ˆλ“œν’€μ˜ μƒνƒœ

      • RUNNING : μƒˆλ‘œμš΄ TASKλ₯Ό λ°›κ³  큐에 μ§‘μ–΄ 넣은 일을 ν•˜λŠ” μƒνƒœ

      • SHUTDOWN : μƒˆλ‘œμš΄ TASKλ₯Ό 받지말고 νμ—μžˆλŠ” TASKλ₯Ό 처리

        • RUNNING -> SHUTDOWN : shutdown() ν˜ΈμΆœμ‹œ

      • STOP : μƒˆλ‘œμš΄ TASKλ₯Ό λ°›μ§€ μ•Šκ³  νμ—μžˆλŠ” TASK도 μ²˜λ¦¬ν•˜μ§€ μ•Šμ€ μƒνƒœλ‘œ ν˜„μž¬ 진행쀑인 TASK에 INTERRUPTλ₯Ό 건닀.

        • (RUNNING or SHUTDOWM) -> STOP : shutdownNow() ν˜ΈμΆœμ‹œ

      • TIDYING : λͺ¨λ“  TASKλŠ” μ†Œλ©Έ λ˜μ—ˆκ³ , WORKER COUNTλŠ” 0. 이 μƒνƒœλ‘œ μ „μ΄λ˜λŠ” μŠ€λ ˆλ“œλŠ” terminated() λ©”μ„œλ“œλ₯Ό μ‹€ν–‰μ‹œμΌœ μ’…λ£Œν•  μ˜ˆμ •μΈ μƒνƒœ

        • STOP -> TIDYING : μŠ€λ ˆλ“œ 풀이 λΉ„μ—ˆμ„λ•Œ

      • TERMINATED : λͺ¨λ“  μŠ€λ ˆλ“œκ°€ terminated()된 μƒνƒœ

  • submit : Futureνƒ€μž…μ„ λ°˜ν™˜ν•˜κ³  μž‘μ—…μ²˜λ¦¬ 도쀑에 μ˜ˆμ™Έκ°€ λ°œμƒν•΄λ„ μŠ€λ ˆλ“œλŠ” μ’…λ£Œλ˜μ§€ μ•Šκ³  λ‹€μŒ μž‘μ—…μ„ μœ„ν•΄ μž¬μ‚¬μš©λ˜κΈ° λ•Œλ¬Έμ— μŠ€λ ˆλ“œμ˜ 생성 μ˜€λ²„ν—€λ”λ₯Ό 쀄일 수 μžˆλ‹€.

    public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          return ftask;
    }

2. κ²°κ³Ό κ°€μ Έμ˜€κΈ° (get/ poll / take)

  • get

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
    
        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };
    
        Future<String> result = executorService.submit(hello);
    
        result.get();
    
        executorService.shutdown();
    }

    get()ν•¨μˆ˜λŠ” blocking call 방식이라 μŠ€λ ˆλ“œμ˜ μž‘μ—…μ΄ 아직 λλ‚˜μ§€ μ•Šμ•˜λ‹€λ©΄ κΈ°λ‹€λ¦° 후에 리턴값을 λ°›μ•„μ˜€κ³  λ‹€μŒ 쀄을 μ‹€ν–‰ν•˜κ²Œ λœλ‹€.

  • poll : μ™„λ£Œλœ μž‘μ—…μ˜ Futureλ₯Ό κ°€μ Έμ˜€κ³  μ™„λ£Œλœ μž‘μ—…μ΄ μ—†λ‹€λ©΄ μ¦‰μ‹œ null을 리턴

  • poll(timeout, Timeunit) : μ™„λ£Œλœ Futureλ₯Ό κ°€μ Έμ˜€κ³  μ™„λ£Œλœ μž‘μ—…μ΄ μ—†λ‹€λ©΄ TimeoutκΉŒμ§€ λΈ”λ‘œν‚Ή

  • take : μ™Όλ£Œλœ μž‘μ—…μ˜ Futureλ₯Ό κ°€μ Έμ˜€κ³  μ™„λ£Œλœ μž‘μ—…μ΄ μ—†λ‹€λ©΄ μžˆμ„λ•ŒκΉŒμ§€ λΈ”λ‘œν‚Ή

3. μž‘μ—… μƒνƒœ 확인 (isDone())

μž‘μ—…μ΄ 끝났닀면 true λ°˜ν™˜

4. μž‘μ—… μ·¨μ†Œ (cancel())

  • μ·¨μ†Œ ν–ˆμœΌλ©΄ true, λͺ»ν–ˆμœΌλ©΄ false λ°˜ν™˜

  • νŒŒλΌλ―Έν„°λ‘œ trueλ₯Ό μ£Όλ©΄ ν˜„μž¬ 진행쀑인 μŠ€λ ˆλ“œλ₯Ό interrupt ν•˜κ³ , falseλ₯Ό μ£Όλ©΄ μž‘μ—…μ΄ λλ‚ λ•ŒκΉŒμ§€ 기닀리고 μ·¨μ†Œν•œλ‹€.

5. μ—¬λŸ¬μž‘μ—… λ™μ‹œμ— μ‹€ν–‰ (invokeAll())

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };

        Callable<String> world = () -> {
            Thread.sleep(3000L);
            return "World";
        };

        Callable<String> hi = () -> {
            Thread.sleep(1000L);
            return "Hi";
        };

        List<Future<String>> result = executorService.invokeAll(List.of(hello,world,hi));

        for(Future<String> r : result){
            System.out.println(r);
        }

        executorService.shutdown();
    }

κ²°κ³Όλ₯Ό Future List둜 λ°›μ•„μ˜€κΈ° λ•Œλ¬Έμ— λ™μ‹œμ— μ‹€ν–‰ν•œ μž‘μ—…μ€‘ 제일 였래 κ±Έλ¦¬λŠ” μž‘μ—…μ˜ μ‹œκ°„λ§ŒνΌ 걸리게 λœλ‹€.

65. κ°€μž₯ λ¨Όμ €μ’…λ£Œλœ μž‘μ—… λ°˜ν™˜λ°›κΈ° (invokeAny())

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> world = () -> {
        Thread.sleep(3000L);
        return "World";
    };

    Callable<String> hi = () -> {
        Thread.sleep(1000L);
        return "Hi";
    };

    String result = executorService.invokeAny(List.of(hello,world,hi));

    executorService.shutdown();
}

κ°€μž₯ λ¨Όμ €λλ‚œ μž‘μ—…μ˜ 결과물을 λ°˜ν™˜λ°›λŠ” λ©”μ„œλ“œμΈλ° μ΄λ•Œ λ°˜ν™˜μ€ blocking call이기 λ•Œλ¬Έμ— Future이 μ•„λ‹Œ 일반 κ°μ²΄νƒ€μž…μ„ λ°˜ν™˜λ°›λŠ”λ‹€.

Future의 문제점

  1. Futureλ₯Ό μ™ΈλΆ€μ—μ„œ μ·¨μ†Œμ‹œν‚€κ±°λ‚˜, get()에 νƒ€μž„μ•„μ›ƒμ„ μ‹€ν–‰ν•˜λŠ”λ“± μ™„λ£Œμž‘μ—…μ„ μˆ˜ν–‰ν•  수 μ—†λ‹€.

  2. get()μ΄λΌλŠ” blocking call을 μ‚¬μš©ν•˜μ§€ μ•Šκ³ μ„œλŠ” μž‘μ—…μ΄ λλ‚¬μ„λ•Œ μ½œλ°±μ„ μ‹€ν–‰ν•  수 μ—†λ‹€.

  3. μ—¬λŸ¬ Futureλ₯Ό 쑰립할 수 μ—†λ‹€.

  4. μ˜ˆμ™Έμ²˜λ¦¬μš© APIλ₯Ό μ œκ³΅ν•˜μ§€ μ•ŠλŠ”λ‹€.

CompletableFuture

μžλ°”μ—μ„œ 비동기 ν”„λ‘œκ·Έλž˜λ°μ„ κ°€λŠ₯μΌ€ν•˜λŠ” μΈν„°νŽ˜μ΄μŠ€

API

1. λΉ„λ™κΈ°λ‘œ μž‘μ—… μ‹€ν–‰

μ›ν•˜λŠ” μ“°λ ˆλ“œ 풀을 μ‚¬μš©ν•΄μ„œ μ‹€ν–‰ν•  수 μžˆλ‹€. κΈ°λ³Έμ μœΌλ‘œλŠ” ForkJoinPool.commonPool()둜 ThreadPoolExecutor,ExecutorServiceλ₯Ό μ‚¬μš©ν•  μˆ˜λ„ μžˆλ‹€.

기쑴의 ThreadPoolExecutorλŠ” μ„œλ‘œ 독립적인 μž‘μ—…μ„ μœ„ν•΄ μ„€κ³„λ˜μ—ˆμœΌλ©° 잠재적으둜 μ°¨λ‹¨λ˜κ³  거친 μž‘μ—…λ„ 염두에 두고 μ„€κ³„λ˜μ—ˆλ‹€.

ForkJoinPool? μž¬κ·€μ μΈ 닀쀑 μŠ€λ ˆλ“œ ν”„λ‘œκ·Έλž¨μ—μ„œ λŒ€κΈ° 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄ μ„€κ³„λœ κ²ƒμœΌλ‘œ λ‹€λ₯Έ μž‘μ—…μ— λŒ€ν•΄ μž¬κ·€μ μ΄λ©° μƒν˜Έ μ˜μ‘΄μ μΌλ•Œ μ‚¬μš©ν•˜λŠ” 것이 효과적인 μŠ€λ ˆλ“œ ν’€. μ΄λŠ” λ‹€λ₯Έ μž‘μ—…μ„ κΈ°λ‹€λ¦¬λŠ”λ° 더 λ§Žμ€ μ‹œκ°„μ„ μ†ŒλΉ„ν•˜κ³  μžμ›μ„ λ‚­λΉ„ν•˜κ²Œ λ˜λŠ” λ¬Έμ œκ°€ μžˆλ‹€.

ExecutorService executorService = Executors.newFixed(ThreadPool(4));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }, executorService).thenApply((s) -> {
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    },executorService);

μ½œλ°±ν•¨μˆ˜λ“€μ˜ λ‘λ²ˆμ§Έ λ§€κ°œλ³€μˆ˜λ‘œ μŠ€λ ˆλ“œ 풀을 μ „λ‹¬ν•˜λ©΄ ν•΄λ‹Ή μŠ€λ ˆλ“œν’€μ„ μ΄μš©ν•˜μ—¬ μž‘μ—…μ„ μˆ˜ν–‰ν•  수 μžˆλ‹€.

  • runAsync() : 리턴값이 μ—†λŠ” 경우

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    });
    
    future.get();
  • supplyAsync() : 리턴값이 μžˆλŠ” 경우

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    });
    
    System.out.println(future.get());

2. 콜백 제곡

콜백 자체λ₯Ό λ˜λ‹€λ₯Έ μŠ€λ ˆλ“œμ—μ„œ μ‹€ν–‰ν•  수 μžˆλ‹€.

  • thenApply(Function) : 리턴값을 λ°›μ•„μ„œ λ‹€λ₯Έ κ°’μœΌλ‘œ λ°”κΎΈλŠ” 콜백

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenApply((s) -> {
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    });
    
    System.out.println(future.get());
  • thenAccept(Consumer) : λ¦¬ν„΄κ°’μœΌλ‘œ 리턴없이 λ‹€λ₯Έ μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” 콜백

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenAccept((s) -> {
        System.out.println(s + Thread.currentThread().getName());
    });
    
    future.get();
  • thenRun(Runnable) : 리턴값을 λ°›μ§€ μ•Šκ³  λ‹€λ₯Έ μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” 콜백

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenRun(() -> {
        System.out.println(Thread.currentThread().getName());
    });
    
    future.get();

3. μ‘°ν•©ν•˜κΈ°

  • thenCompose() : 두 μž‘μ—…μ΄ μ„œλ‘œ 연관관계가 μžˆμ–΄ μ΄μ–΄μ„œ μ‹€ν–‰ν•˜λ„λ‘ μ‘°ν•©

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
    
        CompletableFuture<String> future = hello.thenCompose(Example::getWorld);
        System.out.println(future.get());
    }
    
    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + "World";
        });
    }

    thenApply와 거의 λΉ„μŠ·ν•˜λ‹€κ³  ν•  수 μžˆλŠ”λ° 차이점이라고 ν•œλ‹€λ©΄ thenApplyλŠ” λ°˜ν™˜κ°’μ„ μ΄μš©ν•΄ μƒˆλ‘œμš΄ μ½œλ°±μ„ μ‹€ν–‰ν•œλ‹€λ©΄, thenComposeλŠ” λ°˜ν™˜κ°’μ΄ μ•„λ‹ˆλΌ μ΄μ „μ˜ 처리 둜직 자체λ₯Ό 인수둜 μƒˆλ‘œμš΄ μ½œλ°±μ„ μ‹€ν–‰ν•˜λŠ” 것. ν•œλ§ˆλ””λ‘œ Futureλ₯Ό μ€‘μ²©ν•˜λŠ” 것이 μ•„λ‹ˆλΌ ν‰λ©΄ν™”ν•˜μ—¬ λ°˜ν™˜ν•˜λŠ” 것이닀. κ·Έλž˜μ„œ CompletableFuture λ©”μ„œλ“œλ₯Ό μ—°κ²°ν•˜λŠ” 것이라면 thenComposerκ°€ μ’‹λ‹€.

  • thenCombine() : 두 μž‘μ—…μ„ λ…λ¦½μ μœΌλ‘œ μ‹€ν–‰ν•˜κ³  λ‹€ μ’…λ£Œ ν–ˆμ„λ•Œ 콜백 μ‹€ν–‰

     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
    
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });
    
        CompletableFuture<String> future = hello.thenCombine(world, (helloReturn,worldReturn) -> {
            return helloReturn + " " + worldReturn;
        });
    
        System.out.println(future.get());
    }
  • allOf() : μ—¬λŸ¬ μž‘μ—…μ„ λͺ¨λ‘ μ‹€ν–‰ν•˜κ³  λͺ¨λ“  μž‘μ—… 결과에 콜백 μ‹€ν–‰

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
    
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });
    
    List<CompletableFuture<String>> futrues = List.of(hello,world);
    CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
    
    CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    
    results.get().forEach(System.out.println);
  • anyOf() : μ—¬λŸ¬ μž‘μ—…μ€‘μ— κ°€μž₯ 빨리 λλ‚œ ν•˜λ‚˜μ˜ 결과에 콜백 μ‹€ν–‰

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
    
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });
    
    CompletableFuture<Void> future = CompletableFuture.anyOf(hello,world).thenAccept(System.out::println);

4. μ˜ˆμ™Έμ²˜λ¦¬

  • exceptionally(Function)

    boolean throwError = true;
    
    CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> {
        if(throwError){
            throw new IllegalArgumentException();
        }
    
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).exceptionally(ex -> {
        System.out.println(ex);
        return "Error";
    });
  • handle(BiFunction)

    boolean throwError = true;
    
    CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> {
        if(throwError){
            throw new IllegalArgumentException();
        }
    
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).handle((result,ex) -> {
        if(ex != null) {
            System.out.println(ex);
            return "Error";
        }
        return result;
    });

    Errorκ°€ μžˆμ„λ•Œμ™€ 정상적인 μ’…λ£ŒμΌλ•Œ λͺ¨λ‘ 핸듀링할 수 μžˆλŠ” λ©”μ„œλ“œμ΄λ‹€.

Last updated