CompletableFuture는 박스 채널 모델의 문제점을 해결(효율적으로 결과값을 처리)하기 위해 콤비네이터 메서드를 사용할 수 있다.
CompletionStage 인터페이스에 여러가지 콤비네티어 메서드가 정의되어 있다.
complete, thenCombine
complete - Future 의 연산이 끝나지 않을 경우 반환값을 강제로 지정하여 종료 thenCombine - 현재 CompletionStage 와 매개변수로 들어온 CompletionStage 를 합쳐 새로운 CompletionStage 를 반환, 2개의 Future 를 합침.
publicstaticvoidmain(String[] args)throws Exception { ExecutorServiceexecutorService= Executors.newFixedThreadPool(10); intx=100; CompletableFuture<Integer> a = newCompletableFuture<>(); CompletableFuture<Integer> b = newCompletableFuture<>(); CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z); // a, b 연산이 끝난 thenCombine 두번째 파라미터인 람다식 진행 executorService.submit(() -> a.complete(f(x))); // 트리거 발생, 연산 시작 executorService.submit(() -> b.complete(g(x))); System.out.println(c.get()); // 람다식 결과 대기 executorService.shutdown(); }
publicstaticintf(int x) { System.out.println("f(x) invoked"); return x * 2; } publicstaticintg(int x) { System.out.println("g(x) invoked"); return x + 1; }
결과값
1 2 3
f(x) invoked g(x) invoked 301
병렬실행의 효율성은 높이고 병목현상은 최소화 한다.
thenApply
This method is analogous to {@link java.util.Optional#map Optional.map} and {@link java.util.stream.Stream#map Stream.map}.
1
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
스트림의 map 과 유사한 메서드로 인스턴스의 CompletionStage 가 무사히 끝난 후 새로운 CompletionStage 로 변환하여 반환.
1 2 3 4
CompletableFuture<Integer> a = newCompletableFuture<>(); CompletableFuture<Long> r = a.thenApply(value -> value.longValue()); a.complete(5); System.out.println(r.get()); // 5
thenApply 안에서 동기 메서드로 작성된다.
thenCompose
This method is analogous to {@link java.util.Optional#flatMap Optional.flatMap} and {@link java.util.stream.Stream#flatMap Stream.flatMap}.
1
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
스트림의 flatMap 과 유사한 메서드로 thenApply 와 기능이 거의 유사하나 Function 의 반환요소가 CompletionStage 로 감쌓여 있다.
1 2 3 4
CompletableFuture<Integer> a = newCompletableFuture<>(); CompletableFuture<Long> r = a.thenCompose(value -> CompletableFuture.supplyAsync(() -> value.longValue())); a.complete(5); System.out.println(r.get()); // 5
supplyAsync 를 통해 CompletionStage<Long> 을 반환한다.
thenApply, thenCompose 메서드의 반환값은 모두 CompletionStage<U> 이고 then... 이기에 둘다 선행된 CompletionStage 가 끝난 후 실행된다 thenCompose 는 여러개의 CompletionStage를 하나의 CompletionStage 재 조합하고 thenApply 는 여러개의 CompletionStage를 각각 실행하고 새로운 CompletionStage 를 반환한다고 생각하면 된다.
Invocation returned after 3 msecs Doing something else... Exception in thread "Thread-0" java.lang.RuntimeException: something execption at modernjavainaction.chap16.v1.Shop.lambda$getPriceAsync$0(Shop.java:32) at java.base/java.lang.Thread.run(Thread.java:834)
get() 으로 데이터를 가져올 수 있을때 까지 영원히 기다린다. Future 안에서 발생한 예외를 밖으로 전달해야 한다.
supplyAsync 의 2번째 매개변수로 executor 지정하여 다시 1초 안에 끝나는지 확인
thenAccept
1
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
complete 시에 특정작업을 하는 콤비네이터 메서드
위의 findPricesFuture 메서드의 호출 목적은 반환된 List<String> 을 아래와 같이 콘솔에 출력하는 것이다.
1
[BestPrice price is 123.25651664705744, LetsSaveBig price is 169.4653393606115, MyFavoriteShop price is 214.12914480588853, BuyItAll price is 184.74384995303313]
지금까지의 shop.getPrice() 의 경우 1초 고정지연 이기에 병목현상이 발생하지 않는다. 하지만 하나의 shop.getPrice() 에서 5초지연이 발생할 경우 map(CompletableFuture::join) 에서 해당 지연이 끝날 때 까지 병목현상이 발생하며 최종적으로 5초후에 콘솔출력이 진행된다.
모든 shop.getPrice() 가 끝나는 것을 기다릴 필요없이 먼저 끝난 CompletionStage내용을 출력하고 싶다면 thenAccept 를 사용할 수 있다.