CompletableFuture

Java์—์„œ์˜ Concurrent

  1. ๋ฉ€ํ‹ฐ ํ”„๋กœ์„ธ์‹ฑ ( ProcessBuilder )

  2. ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋”ฉ ( Thread/Runnable )

๋™์‹œ์„ฑ(Concurrency) vs ๋ณ‘๋ ฌ์„ฑ(Parallelism)

๋™์‹œ์„ฑ? ์‹ฑ๊ธ€์ฝ”์–ด์—์„œ ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ๋ฅผ ๋™์ž‘์‹œํ‚ค๊ธฐ ์œ„ํ•œ ๋ฐฉ์‹์œผ๋กœ ๋ฉ€ํ‹ฐ ํƒœ์Šคํ‚น์„ ์œ„ํ•ด ์—ฌ๋Ÿฌ๊ฐœ ์Šค๋ ˆ๋“œ๊ฐ€ ๋ฒˆ๊ฐˆ์•„๊ฐ€๋ฉด์„œ ์‹คํ–‰. ํ•œ๋งˆ๋””๋กœ ๋™์‹œ์— ์‹คํ–‰๋˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ๋ณด์ด๋Š” ๊ฒƒ.

๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ๋กœ ๋™์‹œ์„ฑ์„ ๋งŒ์กฑ์‹œํ‚ฌ์ˆ˜ ์žˆ๋Š” ๊ฑฐ์ง€ ๋™์‹œ์„ฑ์ด ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ๋Š” ์•„๋‹˜. ์ฝ”ํ‹€๋ฆฐ์˜ ์ฝ”๋ฃจํ‹ด์€ ์‹ฑ๊ธ€์Šค๋ ˆ๋“œ๋กœ ๋™์‹œ์„ฑ์„ ๋งŒ์กฑ

๋ณ‘๋ ฌ์„ฑ? 2๊ฐœ์ด์ƒ์˜ task๊ฐ€ ์žˆ์„๋•Œ ๊ฐ task๊ฐ€ ๋ฌผ๋ฆฌ์ ์ธ ์‹œ๊ฐ„์œผ๋กœ ๋™์‹œ์— ์‹คํ–‰์ด ๊ฐ€๋Šฅ. ๋ฉ€ํ‹ฐ์ฝ”์–ด์—์„œ ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ๋ฅผ ๋™์ž‘์‹œํ‚ค๋Š” ๋ฐฉ์‹์œผ๋กœ ํ•œ๊ฐœ ์ด์ƒ์˜ ์Šค๋ ˆ๋“œ๋ฅผ ํฌํ•จํ•˜๋Š” ๊ฐ ์ž‘์—…๋“ค์ด ๋ฌผ๋ฆฌ์ ์ธ ์‹œ๊ฐ„์œผ๋กœ ์™„์ „ ๋™์‹œ์— ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ. (์ด๋•Œ ๋™์‹œ ์ž‘์—…์€ ๋ฉ€ํ‹ฐ์ฝ”์–ด๊ฐ€ ๋ ์ˆ˜๋„ ๋„คํŠธ์›Œํฌ๋ฅผ ์ด์šฉํ•œ ๋ถ„์‚ฐ์ปดํ“จํŒ…์ด ๋  ์ˆ˜ ์žˆ๋‹ค.)

๋ณ‘๋ ฌ์„ฑ์„ ๋งŒ์กฑํ•˜๋ฉด ๋™์‹œ์„ฑ๋„ ๋งŒ์กฑ, ๋™์‹œ์„ฑ์„ ๋งŒ์กฑํ•œ๋‹ค๊ณ  ๋ณ‘๋ ฌ์„ฑ ๋งŒ์กฑx

๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ

๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ์ƒ์„ฑ

1. Thread ๋ฅผ ์ƒ์†๋ฐ›์•„ ํด๋ž˜์Šค๋กœ ์ •์˜

public static void main(String[] args) {
    MyThread thread = new MyThread();
    thread.start();

    System.out.println("Hello");
}

static class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Hello");
    }
}

2. Thread ์ƒ์„ฑ์ž์— ํŒŒ๋ฆฌ๋ฏธํ„ฐ๋กœ Runnalbe ์ธํ„ฐํŽ˜์ด์Šค ๊ตฌํ˜„ํ•˜์—ฌ ์ƒ์„ฑ

//1
public static void main(String[] args) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
            System.out.println("Hello thread");

            }
        });
        thread.start();

        System.out.println("Hello");
}

//2
public static void main(String[] args) {
        Thread thread = new Thread(() -> { System.out.println("thread hello")});
        thread.start();

        System.out.println("Hello");
}

์Šค๋ ˆ๋“œ ์ฃผ์š” ๊ธฐ๋Šฅ

  • ํ˜„์žฌ ์Šค๋ ˆ๋“œ ๋ฉˆ์ถฐ๋‘๊ธฐ (sleep) : ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ธฐํšŒ๋ฅผ ์ฃผ์ง€๋งŒ ๋ฝ์„ ๊ฑธ์ง„ ์•Š์•„ ๋ฐ๋“œ๋ฝ์ด ๊ฑธ๋ฆด ์ˆ˜ ์žˆ๋‹ค.

      public static void main(String[] args) throws ExecutionException, InterruptedException {
          Thread thread = new Thread(() -> {
              try {
                  Thread.sleep(1000L);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("thread hello");
          });
          thread.start();
          System.out.println("main hello");
      }
    
      //print
      main
      thread hello
  • ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ ๊นจ์šฐ๊ธฐ (interrupted) : ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋ฅผ ๊นจ์›Œ์„œ InterruptedException์„ ๋ฐœ์ƒ ์‹œํ‚จ๋‹ค. ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ํ•  ์ผ์„ ์ฝ”๋”ฉํ•˜๊ธฐ ๋‚˜๋ฆ„์ด๋‹ค.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                System.out.println("thread die");
                return;
            }
            System.out.println("thread hello");
        });
        thread.start();
        System.out.println("main");
        thread.interrupt();
    }
    
    // print
    main
    thread die

    ๋งŒ์ผ catch๊ตฌ๋ฌธ์— return ๋ฌธ์ด ์กด์žฌํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด ๊ทธ๋Œ€๋กœ catch๋ธ”๋Ÿญ์„ ๋น ์ ธ๋‚˜๊ฐ€ thread hello๋ฅผ ์ถœ๋ ฅํ•  ๊ฒƒ์ด๋‹ค.

  • ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ ๊ธฐ๋‹ค๋ฆฌ๊ธฐ (join) : ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๊ฐ€ ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                System.out.println("thread die");
                return;
            }
            System.out.println("thread hello");
        });
        thread.start();
        System.out.println("main");
        thread.join();
    }
    
    //print
    main
    //3์ดˆํ›„
    thread hello

์Šค๋ ˆ๋“œ์˜ ํ˜„์žฌ ๋ฌธ์ œ

์Šค๋ ˆ๋“œ๊ฐ€ ๋งŽ์•„ ์ง€๋ฉด ๋งŽ์•„ ์งˆ ์ˆ˜๋ก ์ฝ”๋”ฉ์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ธฐ๊ฐ€ ๋งค์šฐ ํž˜๋“ค๋‹ค. ( Interrupt, join์— ๋Œ€ํ•œ ์˜ˆ์™ธ์ฒ˜๋ฆฌํ•  ์ฝ”๋“œ๊ฐ€ ๊ธฐํ•˜๊ธ‰์ˆ˜์ ์œผ๋กœ ๋งŽ์•„์ง„๋‹ค.)

Executors

๊ณ ์ˆ˜์ค€์˜ Concurrency ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ง€์›ํ•˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ ์œ„์™€ ๊ฐ™์€ ์Šค๋ ˆ๋“œ ํ’€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋“ค์„ ์ด์šฉํ•˜์—ฌ ๊ตฌํ˜„๋˜์–ด์žˆ๋‹ค.

์Šค๋ ˆ๋“œ๋ฅผ ๋งŒ๋“ค๊ณ  ๊ด€๋ฆฌํ•˜๋Š” ์ž‘์—…์„ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ถ„๋ฆฌํ•˜์—ฌ Executors์— ์œ„์ž„ํ•œ ํ˜•ํƒœ์ด๋‹ค.

ํ•˜๋Š” ์ผ

  • ์Šค๋ ˆ๋“œ ๋งŒ๋“ค๊ธฐ : ์Šค๋ ˆ๋“œ ํ’€์„ ๋งŒ๋“ค์–ด ๊ด€๋ฆฌ

    • ExecutorService executorService = new ThreadPoolExecutorr(core,max,idleTime,TimeUnit.SECONDS,new SynchronousQueue()) : ์ง์ ‘ ์Šค๋ ˆ๋“œ๊ฐœ์ˆ˜์™€ ์œ ํœด์‹œ๊ฐ„, ์ž‘์—…ํ๋“ฑ์„ ์„ค์ •ํ•˜์—ฌ ์ƒ์„ฑ ํ•  ์ˆ˜ ์žˆ๋‹ค.

    ์ฝ”์–ด ์Šค๋ ˆ๋“œ? ์Šค๋ ˆ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์œ ํœด์ƒํƒœ์—์„œ๋„ ์ œ๊ฑฐ๋˜์ง€ ์•Š๊ณ  ์œ ์ง€๋˜๋Š” ์ตœ๋Œ€ ๊ฐœ์ˆ˜

    • ExecutorService executorService = Executors.newSingleThreadExecutor() : ํ•œ๊ฐœ์˜ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ–๋Š” ์Šค๋ ˆ๋“œํ’€ ์ƒ์„ฑ

      public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory));
      }
    • ExecutorService executorService = Executors.newCachedThreadPool() : ์ดˆ๊ธฐ ์Šค๋ ˆ๋“œ ์ˆ˜๋Š” 0, ์ฝ”์–ด ์Šค๋ž˜๋“œ ์ˆ˜๋Š” 0์„ ๊ฐ€์ง€๊ณ  ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜๋ณด๋‹ค ์ž‘์—… ๊ฐœ์ˆ˜๊ฐ€ ๋งŽ์œผ๋ฉด ์ƒˆ ์Šค๋ ˆ๋“œ๋ฅผ ๊ทธ๋•Œ ์ƒ์„ฑ์‹œ์ผœ ์ž‘์—…์‹œํ‚ค๋Š” ์Šค๋ ˆ๋“œํ’€๋กœ ์ตœ๋Œ€ ์Šค๋ ˆ๋“œ ์ˆ˜๋Š” integer.MAX_VALUE. ์Šค๋ ˆ๋“œ๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  60์ดˆ๊ฐ„ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š์œผ๋ฉด ์ œ๊ฑฐ๋œ๋‹ค.

      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
    • ExecutorService executorService = Executors.newFixedThreadPool(int nThreads) : ์ดˆ๊ธฐ ์Šค๋ ˆ๋“œ ์ˆ˜๋Š” 0, ์ฝ”์–ด ์Šค๋ ˆ๋“œ ์ˆ˜๋Š” nThreads, ์ตœ๋Œ€ ์Šค๋ ˆ๋“œ ์ˆ˜๋„ nThreads๋ฅผ ๊ฐ–๋Š” ์Šค๋ ˆ๋“œํ’€ ์ƒ์„ฑ. ์Šค๋ ˆ๋“œ๊ฐ€ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š๊ณ  ๋†€๊ณ ์žˆ๋”๋ผ๋„ ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜๊ฐ€ ์ค„์–ด๋“ค์ง€ ์•Š๋Š”๋‹ค.

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory);
      }
    • ExecutorService executorService = Executors.newWorkStealingPool(int pallelism) : ForkJoinPool์˜ work stealing ๊ธฐ๋ฒ•์˜ ๋ฉ€ํ‹ฐํ”„๋กœ์„ธ์Šค ํ’€ ์ƒ์„ฑ

  • ์Šค๋ ˆ๋“œ ๊ด€๋ฆฌ : ์Šค๋ ˆ๋“œ ์ƒ๋ช…์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌ

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> {
            System.out.println("Thread hello");
        });
    
        executorService.shutdown();
    }

    ExecutorService๋Š” ์ž‘์—…์„ ์‹คํ–‰ํ•˜๊ณ ๋‚˜๋ฉด ๋‹ค๋ฅธ ์ž‘์—…์ด ๋“ค์–ด์˜ฌ๋•Œ๊นŒ์ง€ ๊ณ„์† ๋Œ€๊ธฐํ•˜๊ณ  ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋ช…์‹œ์ ์œผ๋กœ ์ข…๋ฃŒ๋ฅผ ์‹œ์ผœ์ค˜์•ผ ํ•œ๋‹ค.

    ์ข…๋ฃŒ ํ•จ์ˆ˜์—๋Š” shutdown(),shutdownNow()๊ฐ€ ์žˆ๋Š”๋ฐ shutdown()์€ gracefulํ•œ ์ข…๋ฃŒ๋ผํ•˜๋ฉฐ ๋ฐ˜๋“œ์‹œ ์Šค๋ ˆ๋“œ์˜ ์ž‘์—…์ด ๋งˆ์น˜๊ณ  ๋‚˜์„œ ์ข…๋ฃŒ๋ฅผ ํ•˜๋ฉฐ shutdownNow()๋Š” ์ž‘์—…ํ˜„ํ™ฉ์— ์ƒ๊ด€์—†์ด ์ข…๋ฃŒํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

    shutdown()์€ voidํƒ€์ž…์ด๊ณ  shutdownNow()๋Š” ๋ฐ˜ํ™˜๊ฐ’์œผ๋กœ ๋ฏธ์ฒ˜๋ฆฌ๋œ ์ž‘์—…(Runnable) ๋ชฉ๋ก์ด๋‹ค.

  • ์ž‘์—… ์ฒ˜๋ฆฌ ๋ฐ ์‹คํ–‰ : ์Šค๋ ˆ๋“œ๋กœ ์‹คํ–‰ํ•  ์ž‘์—…์„ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋Š” API ์ œ๊ณต

    • ์ž‘์—… ์‹คํ–‰ : submit, execute

    • ์Šค์ผ€์ค„๋ง : scheduleAtFixedRate()์™€ ๊ฐ™์€ ๋ฉ”์„œ๋“œ

Callable / Future

Callable์€ Runnable๊ณผ ๊ฑฐ์˜ ๋Œ€๋ถ€๋ถ„์ด ์ผ์น˜ํ•˜์ง€๋งŒ returnํƒ€์ž…์ด ์กด์žฌํ•œ๋‹ค๋Š” ์ ์˜ ์ฐจ์ด์ ์ด ์žˆ์–ด ์ด๋ฅผ ์ด์šฉํ•ด ์ž‘์—…์ด ๋๋‚ฌ์„๋•Œ์˜ ๊ฐ’์„ ๋ฐ˜ํ™˜๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

API

1. ์ž‘์—… ์š”์ฒญ (execute/submit)

  • execute : ๋ฆฌํ„ดํƒ€์ž…์ด void๋กœ ์ž‘์—…์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํ„ด๋ฐ›์ง€ ๋ชปํ•˜๊ณ  ์ž‘์—…์ฒ˜๋ฆฌ ๋„์ค‘์— ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ์Šค๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋˜๊ณ  ํ•ด๋‹น ์Šค๋ ˆ๋“œ๋Š” ์Šค๋ ˆ๋“œํ’€์—์„œ ์ œ๊ฑฐ

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();      
        if (workerCountOf(c) < corePoolSize) {  //ํ˜„์žฌ ์ƒ์„ฑ๋œ ์Šค๋ ˆ๋“œ๊ฐฏ์ˆ˜๊ฐ€  ์ฝ”์–ด ๊ฐœ์ˆ˜๋ณด๋‹ค ์ž‘๋‹ค๋ฉด
            if (addWorker(command, true))   //์ƒˆ๋กœ ์Šค๋ ˆ๋“œ๋ฅผ ๋งŒ๋“ค์–ด task๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ๋ฆฌํ„ด
                return;
            c = ctl.get();
        }
    
        //๋”์ด์ƒ ์ฒ˜๋ฆฌํ•  ์Šค๋ ˆ๋“œ๊ฐ€ ์—†์„ ๊ฒฝ์šฐ ์›Œ์ปคํ์— task๋ฅผ ๋„ฃ์–ด์ฃผ๊ณ  ๋ฆฌํ„ด
        //์Šค๋ ˆ๋“œ ํ’€์ด ์ฃฝ์—ˆ๊ฑฐ๋‚˜ thread๊ฐ€ ์—†๋Š”์ง€ check
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    +) ctl : ์Šค๋ ˆ๋“œํ’€์˜ ์ƒํƒœ , ์กฐ๊ธˆ์ด๋ผ๋„ ๋” ๋น ๋ฅธ ๊ณ„์‚ฐ์„ ์œ„ํ•ด ๋น„ํŠธ๋กœ ๊ฐ’์„ ์ €์žฅํ•˜๊ณ  ๋น„ํŠธ ์—ฐ์‚ฐ์„ ํ†ตํ•ด ๋™์ž‘

    • workerCount : ํ˜„์žฌ ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜

      • Integer.SiZE - 3 ๋งŒํผ์˜ ๊ฐฏ์ˆ˜๊ฐ€ ์ตœ๋Œ€ ๊ฐฏ์ˆ˜๋กœ ์•ฝ 5์–ต๊ฐœ

    • runState : ์Šค๋ ˆ๋“œํ’€์˜ ์ƒํƒœ

      • RUNNING : ์ƒˆ๋กœ์šด TASK๋ฅผ ๋ฐ›๊ณ  ํ์— ์ง‘์–ด ๋„ฃ์€ ์ผ์„ ํ•˜๋Š” ์ƒํƒœ

      • SHUTDOWN : ์ƒˆ๋กœ์šด TASK๋ฅผ ๋ฐ›์ง€๋ง๊ณ  ํ์—์žˆ๋Š” TASK๋ฅผ ์ฒ˜๋ฆฌ

        • RUNNING -> SHUTDOWN : shutdown() ํ˜ธ์ถœ์‹œ

      • STOP : ์ƒˆ๋กœ์šด TASK๋ฅผ ๋ฐ›์ง€ ์•Š๊ณ  ํ์—์žˆ๋Š” TASK๋„ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š์€ ์ƒํƒœ๋กœ ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ TASK์— INTERRUPT๋ฅผ ๊ฑด๋‹ค.

        • (RUNNING or SHUTDOWM) -> STOP : shutdownNow() ํ˜ธ์ถœ์‹œ

      • TIDYING : ๋ชจ๋“  TASK๋Š” ์†Œ๋ฉธ ๋˜์—ˆ๊ณ , WORKER COUNT๋Š” 0. ์ด ์ƒํƒœ๋กœ ์ „์ด๋˜๋Š” ์Šค๋ ˆ๋“œ๋Š” terminated() ๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰์‹œ์ผœ ์ข…๋ฃŒํ•  ์˜ˆ์ •์ธ ์ƒํƒœ

        • STOP -> TIDYING : ์Šค๋ ˆ๋“œ ํ’€์ด ๋น„์—ˆ์„๋•Œ

      • TERMINATED : ๋ชจ๋“  ์Šค๋ ˆ๋“œ๊ฐ€ terminated()๋œ ์ƒํƒœ

  • submit : Futureํƒ€์ž…์„ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์ž‘์—…์ฒ˜๋ฆฌ ๋„์ค‘์— ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•ด๋„ ์Šค๋ ˆ๋“œ๋Š” ์ข…๋ฃŒ๋˜์ง€ ์•Š๊ณ  ๋‹ค์Œ ์ž‘์—…์„ ์œ„ํ•ด ์žฌ์‚ฌ์šฉ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์Šค๋ ˆ๋“œ์˜ ์ƒ์„ฑ ์˜ค๋ฒ„ํ—ค๋”๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ๋‹ค.

    public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          return ftask;
    }

2. ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ (get/ poll / take)

  • get

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
    
        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };
    
        Future<String> result = executorService.submit(hello);
    
        result.get();
    
        executorService.shutdown();
    }

    get()ํ•จ์ˆ˜๋Š” blocking call ๋ฐฉ์‹์ด๋ผ ์Šค๋ ˆ๋“œ์˜ ์ž‘์—…์ด ์•„์ง ๋๋‚˜์ง€ ์•Š์•˜๋‹ค๋ฉด ๊ธฐ๋‹ค๋ฆฐ ํ›„์— ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์•„์˜ค๊ณ  ๋‹ค์Œ ์ค„์„ ์‹คํ–‰ํ•˜๊ฒŒ ๋œ๋‹ค.

  • poll : ์™„๋ฃŒ๋œ ์ž‘์—…์˜ Future๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ์™„๋ฃŒ๋œ ์ž‘์—…์ด ์—†๋‹ค๋ฉด ์ฆ‰์‹œ null์„ ๋ฆฌํ„ด

  • poll(timeout, Timeunit) : ์™„๋ฃŒ๋œ Future๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ์™„๋ฃŒ๋œ ์ž‘์—…์ด ์—†๋‹ค๋ฉด Timeout๊นŒ์ง€ ๋ธ”๋กœํ‚น

  • take : ์™ผ๋ฃŒ๋œ ์ž‘์—…์˜ Future๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ์™„๋ฃŒ๋œ ์ž‘์—…์ด ์—†๋‹ค๋ฉด ์žˆ์„๋•Œ๊นŒ์ง€ ๋ธ”๋กœํ‚น

3. ์ž‘์—… ์ƒํƒœ ํ™•์ธ (isDone())

์ž‘์—…์ด ๋๋‚ฌ๋‹ค๋ฉด true ๋ฐ˜ํ™˜

4. ์ž‘์—… ์ทจ์†Œ (cancel())

  • ์ทจ์†Œ ํ–ˆ์œผ๋ฉด true, ๋ชปํ–ˆ์œผ๋ฉด false ๋ฐ˜ํ™˜

  • ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ true๋ฅผ ์ฃผ๋ฉด ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์Šค๋ ˆ๋“œ๋ฅผ interrupt ํ•˜๊ณ , false๋ฅผ ์ฃผ๋ฉด ์ž‘์—…์ด ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์ทจ์†Œํ•œ๋‹ค.

5. ์—ฌ๋Ÿฌ์ž‘์—… ๋™์‹œ์— ์‹คํ–‰ (invokeAll())

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> hello = () -> {
            Thread.sleep(2000L);
            return "Hello";
        };

        Callable<String> world = () -> {
            Thread.sleep(3000L);
            return "World";
        };

        Callable<String> hi = () -> {
            Thread.sleep(1000L);
            return "Hi";
        };

        List<Future<String>> result = executorService.invokeAll(List.of(hello,world,hi));

        for(Future<String> r : result){
            System.out.println(r);
        }

        executorService.shutdown();
    }

๊ฒฐ๊ณผ๋ฅผ Future List๋กœ ๋ฐ›์•„์˜ค๊ธฐ ๋•Œ๋ฌธ์— ๋™์‹œ์— ์‹คํ–‰ํ•œ ์ž‘์—…์ค‘ ์ œ์ผ ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” ์ž‘์—…์˜ ์‹œ๊ฐ„๋งŒํผ ๊ฑธ๋ฆฌ๊ฒŒ ๋œ๋‹ค.

65. ๊ฐ€์žฅ ๋จผ์ €์ข…๋ฃŒ๋œ ์ž‘์—… ๋ฐ˜ํ™˜๋ฐ›๊ธฐ (invokeAny())

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> world = () -> {
        Thread.sleep(3000L);
        return "World";
    };

    Callable<String> hi = () -> {
        Thread.sleep(1000L);
        return "Hi";
    };

    String result = executorService.invokeAny(List.of(hello,world,hi));

    executorService.shutdown();
}

๊ฐ€์žฅ ๋จผ์ €๋๋‚œ ์ž‘์—…์˜ ๊ฒฐ๊ณผ๋ฌผ์„ ๋ฐ˜ํ™˜๋ฐ›๋Š” ๋ฉ”์„œ๋“œ์ธ๋ฐ ์ด๋•Œ ๋ฐ˜ํ™˜์€ blocking call์ด๊ธฐ ๋•Œ๋ฌธ์— Future์ด ์•„๋‹Œ ์ผ๋ฐ˜ ๊ฐ์ฒดํƒ€์ž…์„ ๋ฐ˜ํ™˜๋ฐ›๋Š”๋‹ค.

Future์˜ ๋ฌธ์ œ์ 

  1. Future๋ฅผ ์™ธ๋ถ€์—์„œ ์ทจ์†Œ์‹œํ‚ค๊ฑฐ๋‚˜, get()์— ํƒ€์ž„์•„์›ƒ์„ ์‹คํ–‰ํ•˜๋Š”๋“ฑ ์™„๋ฃŒ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์—†๋‹ค.

  2. get()์ด๋ผ๋Š” blocking call์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ ์„œ๋Š” ์ž‘์—…์ด ๋๋‚ฌ์„๋•Œ ์ฝœ๋ฐฑ์„ ์‹คํ–‰ํ•  ์ˆ˜ ์—†๋‹ค.

  3. ์—ฌ๋Ÿฌ Future๋ฅผ ์กฐ๋ฆฝํ•  ์ˆ˜ ์—†๋‹ค.

  4. ์˜ˆ์™ธ์ฒ˜๋ฆฌ์šฉ API๋ฅผ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š”๋‹ค.

CompletableFuture

์ž๋ฐ”์—์„œ ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ฐ€๋Šฅ์ผ€ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค

API

1. ๋น„๋™๊ธฐ๋กœ ์ž‘์—… ์‹คํ–‰

์›ํ•˜๋Š” ์“ฐ๋ ˆ๋“œ ํ’€์„ ์‚ฌ์šฉํ•ด์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” ForkJoinPool.commonPool()๋กœ ThreadPoolExecutor,ExecutorService๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

๊ธฐ์กด์˜ ThreadPoolExecutor๋Š” ์„œ๋กœ ๋…๋ฆฝ์ ์ธ ์ž‘์—…์„ ์œ„ํ•ด ์„ค๊ณ„๋˜์—ˆ์œผ๋ฉฐ ์ž ์žฌ์ ์œผ๋กœ ์ฐจ๋‹จ๋˜๊ณ  ๊ฑฐ์นœ ์ž‘์—…๋„ ์—ผ๋‘์— ๋‘๊ณ  ์„ค๊ณ„๋˜์—ˆ๋‹ค.

ForkJoinPool? ์žฌ๊ท€์ ์ธ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋žจ์—์„œ ๋Œ€๊ธฐ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ์„ค๊ณ„๋œ ๊ฒƒ์œผ๋กœ ๋‹ค๋ฅธ ์ž‘์—…์— ๋Œ€ํ•ด ์žฌ๊ท€์ ์ด๋ฉฐ ์ƒํ˜ธ ์˜์กด์ ์ผ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ํšจ๊ณผ์ ์ธ ์Šค๋ ˆ๋“œ ํ’€. ์ด๋Š” ๋‹ค๋ฅธ ์ž‘์—…์„ ๊ธฐ๋‹ค๋ฆฌ๋Š”๋ฐ ๋” ๋งŽ์€ ์‹œ๊ฐ„์„ ์†Œ๋น„ํ•˜๊ณ  ์ž์›์„ ๋‚ญ๋น„ํ•˜๊ฒŒ ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ์žˆ๋‹ค.

ExecutorService executorService = Executors.newFixed(ThreadPool(4));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }, executorService).thenApply((s) -> {
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    },executorService);

์ฝœ๋ฐฑํ•จ์ˆ˜๋“ค์˜ ๋‘๋ฒˆ์งธ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์Šค๋ ˆ๋“œ ํ’€์„ ์ „๋‹ฌํ•˜๋ฉด ํ•ด๋‹น ์Šค๋ ˆ๋“œํ’€์„ ์ด์šฉํ•˜์—ฌ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • runAsync() : ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    });
    
    future.get();
  • supplyAsync() : ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    });
    
    System.out.println(future.get());

2. ์ฝœ๋ฐฑ ์ œ๊ณต

์ฝœ๋ฐฑ ์ž์ฒด๋ฅผ ๋˜๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • thenApply(Function) : ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์•„์„œ ๋‹ค๋ฅธ ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ๋Š” ์ฝœ๋ฐฑ

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenApply((s) -> {
        System.out.println(Thread.currentThread().getName());
        return s.toUpperCase();
    });
    
    System.out.println(future.get());
  • thenAccept(Consumer) : ๋ฆฌํ„ด๊ฐ’์œผ๋กœ ๋ฆฌํ„ด์—†์ด ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenAccept((s) -> {
        System.out.println(s + Thread.currentThread().getName());
    });
    
    future.get();
  • thenRun(Runnable) : ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์ง€ ์•Š๊ณ  ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
    }).thenRun(() -> {
        System.out.println(Thread.currentThread().getName());
    });
    
    future.get();

3. ์กฐํ•ฉํ•˜๊ธฐ

  • thenCompose() : ๋‘ ์ž‘์—…์ด ์„œ๋กœ ์—ฐ๊ด€๊ด€๊ณ„๊ฐ€ ์žˆ์–ด ์ด์–ด์„œ ์‹คํ–‰ํ•˜๋„๋ก ์กฐํ•ฉ

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
    
        CompletableFuture<String> future = hello.thenCompose(Example::getWorld);
        System.out.println(future.get());
    }
    
    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + "World";
        });
    }

    thenApply์™€ ๊ฑฐ์˜ ๋น„์Šทํ•˜๋‹ค๊ณ  ํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ ์ฐจ์ด์ ์ด๋ผ๊ณ  ํ•œ๋‹ค๋ฉด thenApply๋Š” ๋ฐ˜ํ™˜๊ฐ’์„ ์ด์šฉํ•ด ์ƒˆ๋กœ์šด ์ฝœ๋ฐฑ์„ ์‹คํ–‰ํ•œ๋‹ค๋ฉด, thenCompose๋Š” ๋ฐ˜ํ™˜๊ฐ’์ด ์•„๋‹ˆ๋ผ ์ด์ „์˜ ์ฒ˜๋ฆฌ ๋กœ์ง ์ž์ฒด๋ฅผ ์ธ์ˆ˜๋กœ ์ƒˆ๋กœ์šด ์ฝœ๋ฐฑ์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ. ํ•œ๋งˆ๋””๋กœ Future๋ฅผ ์ค‘์ฒฉํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ ํ‰๋ฉดํ™”ํ•˜์—ฌ ๋ฐ˜ํ™˜ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ๊ทธ๋ž˜์„œ CompletableFuture ๋ฉ”์„œ๋“œ๋ฅผ ์—ฐ๊ฒฐํ•˜๋Š” ๊ฒƒ์ด๋ผ๋ฉด thenComposer๊ฐ€ ์ข‹๋‹ค.

  • thenCombine() : ๋‘ ์ž‘์—…์„ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๋‹ค ์ข…๋ฃŒ ํ–ˆ์„๋•Œ ์ฝœ๋ฐฑ ์‹คํ–‰

     public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });
    
        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
            return "World";
        });
    
        CompletableFuture<String> future = hello.thenCombine(world, (helloReturn,worldReturn) -> {
            return helloReturn + " " + worldReturn;
        });
    
        System.out.println(future.get());
    }
  • allOf() : ์—ฌ๋Ÿฌ ์ž‘์—…์„ ๋ชจ๋‘ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ž‘์—… ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
    
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });
    
    List<CompletableFuture<String>> futrues = List.of(hello,world);
    CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
    
    CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    
    results.get().forEach(System.out.println);
  • anyOf() : ์—ฌ๋Ÿฌ ์ž‘์—…์ค‘์— ๊ฐ€์žฅ ๋นจ๋ฆฌ ๋๋‚œ ํ•˜๋‚˜์˜ ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰

    CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> { System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    });
    
    CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> { System.out.println("World " + Thread.currentThread().getName());
        return "World";
    });
    
    CompletableFuture<Void> future = CompletableFuture.anyOf(hello,world).thenAccept(System.out::println);

4. ์˜ˆ์™ธ์ฒ˜๋ฆฌ

  • exceptionally(Function)

    boolean throwError = true;
    
    CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> {
        if(throwError){
            throw new IllegalArgumentException();
        }
    
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).exceptionally(ex -> {
        System.out.println(ex);
        return "Error";
    });
  • handle(BiFunction)

    boolean throwError = true;
    
    CompletableFuture<String> hello = CompletableFuture.suuplyAsync(()-> {
        if(throwError){
            throw new IllegalArgumentException();
        }
    
        System.out.println("Hello " + Thread.currentThread().getName());
        return "Hello";
    }).handle((result,ex) -> {
        if(ex != null) {
            System.out.println(ex);
            return "Error";
        }
        return result;
    });

    Error๊ฐ€ ์žˆ์„๋•Œ์™€ ์ •์ƒ์ ์ธ ์ข…๋ฃŒ์ผ๋•Œ ๋ชจ๋‘ ํ•ธ๋“ค๋งํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

Last updated