4. CompletableFuture

Java8에 CompletableFuture 대해 정리한 글

CompletableFuture 정리에 앞서 Java8이전에는 비동기를 위해 어떻게 문제를 해결했는지 살펴 본 후에 정리하였습니다.

1. 자바 Concurrent 프로그래밍 소개

1-1. Concurrent 소프트웨어?

💡 동시에 여러 작업을 할 수 있는 소프트웨어

동시성(Concurrency) vs 병렬성(Parallelism)

동시성? 싱글코어에서 멀티 스레드를 동작시키기 위한 방식으로 멀티 태스킹을 위해 여러개 스레드가 번갈아가면서 실행. 한마디로 동시에 실행되는 것처럼 보이는 것.

멀티스레드로 동시성을 만족시킬수 있는 거지 동시성이 멀티스레드는 아님. 코틀린의 코루틴은 싱글스레드로 동시성을 만족

병렬성? 2개이상의 task가 있을때 각 task가 물리적인 시간으로 동시에 실행이 가능. 멀티코어에서 멀티스레드를 동작시키는 방식으로 한개 이상의 스레드를 포함하는 각 작업들이 물리적인 시간으로 완전 동시에 수행하는 것. (이때 동시 작업은 멀티코어가 될수도 네트워크를 이용한 분산컴퓨팅이 될 수 있다.)

병렬성을 만족하면 동시성도 만족, 동시성을 만족한다고 병렬성 만족x

1-2. Java에서의 Concurrent

1. 멀티 프로세싱 ( ProcessBuilder )

버전

사용 방법

Java 5 이전

Runnable과 Thread를 이용하여 구현

Java 5

ExecutorService, Callable, Future

Java 7

Fork/Join 그리고 RecursiveTask

Java 8

Stream, CompletableFuture

Java 9

분산 비동기 프로그래밍은 명시적으로 지원 (발행 구독 프로토콜 지원 Flow API)

2. 멀티 스레딩 ( Thread/Runnable )

public class App {
    public static void main(String[] args) {
				// 첫번째 방법
        MyThread myThread = new MyThread();
        myThread.start();

        // Anonymous class
        Thread runnable = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Runnable Thread: " + Thread.currentThread().getName());
            }
        });

        // Lambda Expression
        Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName()));

        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    // Thread 상속
    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    }
}

/* 실행결과
Hello: main
Thread: Thread-06
*/

코드의 실행 순서만 봐서는 Thread가 먼저 출력되야 할 것 같지만, 실제로 실행해보면 다르게 출력될 때도 있다.

이를 통해 Thread는 순서를 보장하지 않는다는 것을 알 수 있다.

여기선 로컬클래스를 이용했지만, 익명클래스와 람다표현식을 이용해서도 적용할 수 있다.

method

  1. sleep(mills)

    • 현재 쓰레드 재우기(멈춰두기)

      • 스레드를 대기상태로 멈춰서 다른 스레드가 처리할 수 있도록 함.

      • 하지만 락을 놔주진 않기에 잘못하면 데드락 상태에 걸릴 수 있다.

    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    Thread.sleep(1000L)

    • Thread를 start하면 1초(1000L)동안 멈춰있고 그 동안 다른 쓰레드를 수행하기 때문에 Hello가 항상 우선 출력된다.

  2. interrupt()

    • 다른 쓰레드를 깨우기

      • InterruptException 을 발생

      • 이 에러에 대한 핸들링은 구현 가능

    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.interrupt();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    lambdaThread.interrupt();

    • lambdaThread에 interrupt()메소드를 호출해 lambdaThread내에 InterruptedException 을 발생시킨다.

  3. join()

    • 다른 쓰레드가 끝날 때까지 기다린다.

    public static void main(String[] args) throws InterruptedException {
        //Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.join();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    lambdaThread.join();

    • lambdaThread에 join()메소드를 호출하여 lambdaThread가 종료될 때까지 기다린다.

2. Executors

Runnable이나 Thread와 같은 Low-level이 아닌 고 수준(High-Level) Concurrency 프로그래밍

우리가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업들을 대신 해주는 클래스

threadpool

고수준의 Concurrency 프로그래밍을 지원하는 라이브러리로 위와 같은 스레드 풀 라이브러리 들을 이용하여 구현되어있다.

스레드를 만들고 관리하는 작업을 애플리케이션에서 분리하여 Executors에 위임한 형태이다.

2-1. Executors가 하는 일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어관리한다.

  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.

  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

2-2. 주요 인터페이스

  • Executor: execute(Runnable)

  • ExecutorService: Executor를 상속 받은 인터페이스로, Callable도 실행 가능하며 Executor를 종료 시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.

  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업 실행할 수 있다.

2-3. 예제

기본 사용 예제

public static void main(String[] args) throws InterruptedException {
		// ExecutorService 생성
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    // Legacy case
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    });
    // Lambda Expression
    executorService.submit(()->{
        System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
    });

    executorService.shutdown(); // graceful shutdown
    // executorService.shutdownNow(); //즉시 종료
}

2개의 Thread를 이용하여 실행

public class App {
    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("World"));
        executorService.submit(getRunnable("The"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Lecture"));
        executorService.submit(getRunnable("Concurrent"));
        executorService.submit(getRunnable("Part"));

        executorService.shutdown(); //graceful shutdown
    }
}
/* 실행결과
Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/

Executors.newFixedThreadPool()

  • 해당 메소드를 호출하면 해당 영역에는 인자값으로 넘겨준 숫자만큼 Thread를 관리한다.

  • 위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 2개의 쓰레드를 관리하는 Thread Pool이 도는 동안 Blocking Queue에 등록된 작업들이 차례대로 동작한다.

ExecutorService.newSingleThreadScheduledExcutor();

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
  • scheduleAtFixedRate(실행 Runnable, 시작 지연 시간, 딜레이, 파라미터 시간 단위)

  • 위 코드는 Runnable타입을 반환하는 getRunnable() 메소드를 프로그램이 시작 후 3초 뒤부터 1초마다 수행하는 코드

2-4. Executor 정리하자

  • supplyAsync 등의 메소드 호출시 쓰레드 풀을 명시하지 않으면 Java ForkJoinPoolcommonPool() 이 사용된다.

  • 개발자가 쓰레드 풀을 제어할 수 없다는 것은 나중에 문제가 될 수 있다.

  • 따라서, 항상 Java ExecutorService 를 명시적으로 사용하여 쓰레드 풀을 지정하도록 한다.

3. Callable과 Future

3-1. Callable

  • Runnable과 거의 유사하지만 반환 값을 가질 수 있다.

3-2. Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

3-3. 메소드 살펴보기

1. 작업 요청 (execute/submit)

  • execute : 리턴타입이 void로 작업처리 결과를 리턴받지 못하고 작업처리 도중에 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드풀에서 제거

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();      
        if (workerCountOf(c) < corePoolSize) {  //현재 생성된 스레드갯수가  코어 개수보다 작다면
            if (addWorker(command, true))   //새로 스레드를 만들어 task를 처리하고 리턴
                return;
            c = ctl.get();
        }
    
        //더이상 처리할 스레드가 없을 경우 워커큐에 task를 넣어주고 리턴
        //스레드 풀이 죽었거나 thread가 없는지 check
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  • + 추가적인 설명 ) ctl : 스레드풀의 상태변수로 조금이라도 더 빠른 계산을 위해 비트로 값을 저장하고 비트 연산을 통해 동작

    • workerCount : 현재 스레드 개수

      • Integer.SiZE - 3 만큼의 갯수가 최대 갯수로 약 5억개

    • runState : 스레드풀의 상태

      • RUNNING : 새로운 TASK를 받고 큐에 집어 넣은 일을 하는 상태

      • SHUTDOWN : 새로운 TASK를 받지말고 큐에있는 TASK를 처리

        • RUNNING -> SHUTDOWN : shutdown() 호출시

      • STOP : 새로운 TASK를 받지 않고 큐에있는 TASK도 처리하지 않은 상태로 현재 진행중인 TASK에 INTERRUPT를 건다.

        • (RUNNING or SHUTDOWM) -> STOP : shutdownNow() 호출시

      • TIDYING : 모든 TASK는 소멸 되었고, WORKER COUNT는 0. 이 상태로 전이되는 스레드는 terminated() 메서드를 실행시켜 종료할 예정인 상태

        • STOP -> TIDYING : 스레드 풀이 비었을때

      • TERMINATED : 모든 스레드가 terminated()된 상태

  • submit : Future타입을 반환하고 작업처리 도중에 예외가 발생해도 스레드는 종료되지 않고 다음 작업을 위해 재사용되기 때문에 스레드의 생성 오버헤더를 줄일 수 있다.

    public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          return ftask;
    }

2. 결과 반환 ( get )

  • 해당 메소드는 블록킹 콜이기에 메소드 호출시점부터 코드실행 완료까지 기다린다.

  • 타임아웃을 설정할 수 있다.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");

submit.get();// blocking

System.out.println("End");
executorService.shutdown();

/*
Started!
(2초 뒤)
End
*/

3. 작업 상태 확인 ( isDone )

ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.get(); // blocking

System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();

/* 실행 결과
false
Started!
true
End
*/

4. 작업 취소 ( cancel )

  • 인자 값으로 현재 진행중인 쓰레드 interrupt 여부를 결정한다.

  • true 이면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.cancel(false);

System.out.println(helloFuture.isDone());
System.out.println("End");

helloFuture.get();
executorService.shutdown();

/* 실행 결과
false
Started!
true
End
Exception in thread "main" java.util.concurrent.CancellationException...
*/

helloFuture.cancel(false)

  • 현재 진행중인 작업을 기다린 뒤 작업을 취소한다.

  • 작업이 취소되어 종료되었기 때문에 아래에 helloFuture.isDone()은 true가 반환되며, 이미 취소한 작업을 get() 호출하는 시점에는 CancellationException 예외가 발생

5. 여러 작업 동시 실행 (invokeAll )

ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Callable<String> java = () ->{
    Thread.sleep(3000L);
    return "java";
};
Callable<String> youngjun = () ->{
    Thread.sleep(1000L);
    return "youngjun";
};

List<Future<String>> futures = 
        executorService.invokeAll(Arrays.asList(hello, java, youngjun));
for (Future<String> future : futures) {
    System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());

/*
Hello
java
youngjun
6
*/
  • invokeAll() 메소드는 태스크가 모두 끝날때까지 기다렸다가 값들을 반환

    • 싱글 쓰레드이기 때문에 6초가 소요된다.

    • future list를 반환하고 전부 끝날때까지 holding된다.

    • 넘겨준 Callable list가 정상 처리되든 exception이 발생하든 완료된 것으로 본다.

    • 동작중에 전달받은 list가 변경되면 결과를 보장하지 않는다

    • 넘겨준 list 순서대로 결과 future를 담아서 넘겨준다.

    발생할 수 있는 Exception

    • InterruptedException : 동작이 종료되지 않은(동작중인) task가 취소 되었을 때

    • NullPointerException : 함수의 param중 null이 있을 때

    • RejectedExecutionException : task를 pool에 넣을수 없을 때.

  • 따라서 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll을 쓰기에는 성능이 떨어진다.

    • 그럴 때 사용 할 수 있는 메소드가 invokeAny 이다.

6. 여러 작업 동시 실행 ( invokeAny )

  • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.

  • 블록킹 콜이다.

String result = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println("result = " + result);

/* 실행 결과
result = youngjun
*/

주의할 점은, 싱글 쓰레드로 할 경우 먼저 들어간 순서대로 나오게 된다는 점이다.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> java = () -> {
        Thread.sleep(3000L);
        return "java";
    };

    Callable<String> youngjun = () -> {
        Thread.sleep(1000L);
        return "youngjun";
    };

    String s = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
    System.out.println(s);

    executorService.shutdown();
}

/* 실행 결과
Hello
*/

4. CompletableFuture

4-1. 개요

자바에서 비동기(Asynchronous)프로그래밍을 가능하게하는 인터페이스. Future의 제약사항들을 해결한다.

Future 제약

  • 예외 처리용 API를 제공하지 않는다.

  • 여러 Future를 조합할 수 없다. (ex: Event정보를 받아 다음 Event에 참석할 회원목록조회)

  • Future를 외부에서 완료시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.

  • get()을 호출하기 전까지는 future를 다룰 수 없다.

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
  • 여기서 future는 blocking call

future를 get()으로 가져오는 동안에는 다른 작업들의 수행이 안된다는 의미이고 그 기간이 길어질수록 성능은 떨어질수 밖에 없다.

4-2. CompletableFuture란?

Future와 CompletionStage를 구현하는 구현체

  • 예외처리를 지원하는 메서드

  • 순서의 의존관계를 맞는 스레드 프로그래밍

  • 콜백을 지원하기도 하며 여러 스레드를 하나로 묶어 처리하기에도 용이

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("youngjun");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("youngjun");
System.out.println("future = " + future.get());

4-3. 일반 메서드

1. 비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
    });
    future.get();
  • 리턴값이 있는 경우: supplyAsync()

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    });
    future.get();
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())

2. 콜백 제공하기

  • thenApply(Function)

    리턴값을 받아서 다른 값으로 바꾸는 콜백

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello" + Thread.currentThread().getName());
      return "Hello";
    }).thenApply((s)->{
      System.out.println("content: "+s);
      System.out.println(Thread.currentThread().getName());
      return "HelloAsync";
    });
    System.out.println(future.get());
    • Javascript의 Promise 와 유사한 형태

    • supplyAsync의 람다표현식에서 반환된 Hello라는 값은 체이닝된 메소드 thenApply의 인자값으로 들어가고 사용 가능

    • 그리고 더 이상 체이닝된 메소드가 없기 때문에 return값인 HelloAsync는 반환되어 future로 들어가고 get()을통해 받을 수 있다.

  • thenAccept(Consumer)

    리턴값을 받아 또 다른 작업을 수행하는데 반환값은 없는 콜백 (리턴 x)

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello" + Thread.currentThread().getName());
      return "Hello";
    }).thenAccept((s)->{
      System.out.println("content: "+s);
      System.out.println(Thread.currentThread().getName());
          // return 없다.
    });
    future.get();

  • thenRun(Runnable)

    리턴값을 받지 않고 다른 작업을 수행하는 콜백

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("Hello" + Thread.currentThread().getName());
      return "hello";
    }).thenRun(()->{
      System.out.println(Thread.currentThread().getName());
    });
    future.get();

ParallelStream vs. CompletableFuture

  • ParallelStream : I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간다하며 효율적일 수 있다(모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 쓰레드를 가질 필요가 없다).

  • CompletableFuture : 반면 작업이 I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며 대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할 지 예측하기 어려운 문제도 있다.

4-4. CompletableFuture 조합 메소드

1. thenCompose()

  • 두 작업이 서로 이어서 실행하도록 조합하며 연관된 future간에 많이 사용

public class App {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
        System.out.println(future.get());

    }
    private static CompletableFuture<String> getWorldFuture(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/

2. thenCombine()

  • 두 작업을 독립적으로 실행하고 둘 다 종료 했을때 콜백 실행.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });

    CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
    
    System.out.println(resultFuture.get());
}

3. allOf()

  • 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });
    List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);

    CompletableFuture<List<String>> results =
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                    .thenApply(v -> futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
    results.get().forEach(System.out::println);
}

4. anyOf()

  • 여러 작업 중 가장 끝난 하나의 결과를 콜백에 넘겨 실행

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
});

CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Apple " + Thread.currentThread().getName());
    return "Apple";
});

CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();

4-5. 예외 처리 메소드

1. exeptionally(Function)

boolean throwError = true;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).exceptionally(ex->{
    System.out.println(ex);
    return "Error";
});

System.out.println(msFuture.get());

2. handle(BiFunction)

boolean throwError = false;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).handle((result, ex)->{
    if (Objects.nonNull(ex)) {
        System.out.println(ex);
        return "ERROR";
    }
    return result;
});

System.out.println(msFuture.get());

4-6. CompletableFuture 를 언제 쓸 수 있을까?

  • 아래는 microservices.io 사이트에서 가져온 예제이다.

image
  • CompletableFuture 를 사용할 만한 부분은 API GATEWAY, Storefront WebApp 라고 할 수 있다. 이유는 Account, Inventory, Shipping Service 는 DATA BASE 와 연동하고 있기 때문이다. java 환경에서 RDBMS 연동시에 아직 Async 한 인터페이스 방식을 제공하고 있지 않기 때문에 Async 개발을 해서 큰 효과를 얻기 부족하다.

  • 하지만 GATEWAY 등은 REST Service 와 연동하기 때문에 Async 를 통한 성능 효과를 볼 수 있다. (Blocking Thread 문제 해결 등)

Reference

Last updated