java - 리액터!
Reactor Project
참고
https://kouzie.github.io/java/java-리액터/# https://kouzie.github.io/java/java-RxJava,-Reactive-Stream/
https://projectreactor.io/docs/core/release/api/overview-summary.html
Pivotal
에서 만든 리엑티브 라이브러리, 마찬가지로 Reactive Stream
스펙을 따르며 사용법이 RxJava
와 굉장히 유사하다.
아무래도 Spring
개발업체에서 만든 라이브러리이다 보니 Spring
사용시 많은 리액티브 스타터 의존패키지가 해당 라이브러리를 같이 사용한다.
Flux, Mono
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
https://projectreactor.io/docs/core/release/reference/#about-doc
마찬가지로 Reactive Stream
의 기술 스펙을 구현하였으며
Publisher
구현체로 Flux
와 Mono
가 있다.
package reactor.core.publisher;
import org.reactivestreams.Publisher;
public abstract class Flux<T> implements Publisher<T> {...}
public abstract class Mono<T> implements Publisher<T> {...}
Flux
는 사용법이 RxJava
의 Observable
과 매우 비슷하다.
public static void main(String[] args) {
Flux<String> stream1 = Flux.just("hello", "world");
Flux<Integer> stream2 = Flux.fromArray(new Integer[]{1, 2, 3});
Flux<Integer> stream3 = Flux.fromIterable(Arrays.asList(4, 5, 6));
Flux<Integer> stream4 = Flux.range(2020, 9); // 2020 ~ 2028
}
연속된 스트림을 리액티브하게 반환한다.
Mono
는 사용법이 RxJava
의 Maybe
와 비슷하다.
public static void main(String[] args) {
Mono<String> stream1 = Mono.just("hello");
Mono<Integer> stream2 = Mono.justOrEmpty(null);
Mono<Integer> stream3 = Mono.justOrEmpty(Optional.empty());
}
최대 한개 요소의 스트림을 반환한다.
Flux
와 Mono
는 서로 쉽게 변환 가능하다.
Flux
는 연속된 스트림을 리스트로 변환하여 하나의 요소로 변경, Mono
로 반환할 수 있고
Mono
는 하나의 스트림을 연속된 스트림처럼 감싸 반환할 수 있다.
Mono
의 경우 http, db 질의 반환값으로 사용하기 적합하다
Subscriber
public final Disposable subscribe() {...}
public final Disposable subscribe(Consumer<? super T> consumer) {...}
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) {...}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer) {...}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Consumer<? super Subscription> subscriptionConsumer) {...}
@Override
public final void subscribe(Subscriber<? super T> actual) {...}
RxJava
와 패키지명만 다를뿐 subscribe
메서드 구조, Disposable
인터페이스를 반환하는 것이 동일하다.
public static void main(String[] args) {
Flux.just(1, 2, 3).subscribe(
System.out::println, // consumer
System.err::println, // error consumer
()-> System.out.println("complete")); // complete consumer
}
// 출력결과
// 1
// 2
// 3
// complete
Flux
를RxJava
의Observable
로 변경해도 그대로 동작한다, 그만큼 두 라이브러리가 유사함.
Observable
과 다르게 Flux
는 배압처리까지 가능한 Reactive Stream
을 계승한 클래스이다.
아래처럼 subscription
컨슈머 메서드를 작성해 배압처리가 가능
public static void main(String[] args) {
Flux.just(1, 2, 3).subscribe(
System.out::println, // consumer
System.err::println, // error consumer
() -> System.out.println("complete"), // complete consumer
subscription -> {
subscription.request(2);
subscription.cancel();
});
}
// 출력결과
// 1
// 2
Flux
를RxJava
의Flowable
로 변경해도 그대로 동작한다, 그만큼 두 라이브러리가 유사함.
BaseSubscriber
Flux
의 Subscriber
구현시 TCK
조건을 만족하는 Subscriber
작성은 까다롭다.
Reactor
라이브러리에선 기본적인 요구조건을 만족시킨 Subscriber
를 미리 정의해두었다.
public abstract class BaseSubscriber<T> implements
CoreSubscriber<T>, Subscription, Disposable {
...
}
BaseSubscriber
를 사용하면 TCK
를 만족하는 Subscriber
를 쉽게 구현할 수 있다.
public class MySubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
리액터 객체 생성
push, create
둘다 Flux
생성자로 FluxSink
라는 클래스를 사용해 발행할 데이터를 지정한다.
public static void main(String[] args) {
Flux.push(sink -> IntStream.range(2000, 3000).forEach(sink::next))
.subscribe(e -> System.out.println(e));
Flux.create(sink -> IntStream.range(2000, 3000).forEach(sink::next))
.subscribe(e -> System.out.println(e));
}
FluxSink.next
메서드를 통해 발행 데이터를 구독자에게 발행
둘의 사용법은 같으나 FluxSink
가 멀티 스레드, 싱글 스레드로 생성는지 차이이다.
데이터 발행 코드에서 멀티 스레드 오류가 발생할 수 있다면 push
를 사용하자.
generate, using, usingWhen
generate
는 발행되는 데이터를 연계해 새로운 발행 데이터를 생성 및 관리할 수 있다.
위 그림처럼 조건에 따라 1씩 증가하는 발행 데이터를 생성하고 sink 를 통해 구독자에게 전달하면서
기존 발행 데이터를 사용해 지속적으로 새로운 발행 데이터를 만들어 낸다.
public static void main(String[] args) {
Flux.generate(() -> Tuples.of(0, 1), (state, sink) -> {
System.out.println(state.getT2());
sink.next(state.getT2()); //onNext 신호 발생
int newValue = state.getT1() + state.getT2();
return Tuples.of(state.getT2(), newValue); // 새로운 시퀀스 전달 반복
}).delayElements(Duration.ofMillis(1))
.take(7)
.subscribe(e -> System.out.println(e)); // sink.next 로부터 수신
Thread.sleep(1000);
}
// 출력결과
// 1
// 1
// 2
// 3
// 5
// 8
// 13
간단한 피보나치 수열을 발행하는 Flux
인스턴스 생성
using
은 리소스를 제공하는 인스턴스를 통해 Flux 객체를 생성할 수 있다.
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.using(
Connection::newConnection, // publihser(리소스) 생성
connection -> Flux.fromIterable(connection.getData()), // 리소스로부터 데이터 흭득(생성)
Connection::close);
flux.subscribe(data -> System.out.println("data:" + data),
e -> System.err.println("error:" + e.getMessage()),
() -> System.out.println("complete"));
flux.subscribe(data -> System.out.println("data:" + data),
e -> System.err.println("error:" + e.getMessage()),
() -> System.out.println("complete"));
}
}
class Connection implements AutoCloseable {
private final Random rnd = new Random();
private Connection() {}
public Iterable<String> getData() {
if (rnd.nextInt(10) < 3) // 10 분의 3 확률로 실패
throw new RuntimeException("Connect error");
return Arrays.asList("one", "two", "three");
}
public void close() {
System.out.println("Connection Closed");
}
public static Connection newConnection() {
System.out.println("Connection Created");
return new Connection();
}
}
// 출력결과
// Connection Created
// data:one
// data:two
// data:three
// Connection Closed
// complete
// Connection Created
// data:one
// data:two
// data:three
// Connection Closed
// complete
그림처럼 구독할 때 마다 resource
객체(Connection
) 을 생성하고 리소스 내부의 데이터 발행 메서드를 통해 구독자엑 데이터를 전송한다.
usingWhen
의 경우 using
에서 성공, 실패에 대한 조치까지 매개변수로 지정 가능
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux<String> request = Flux.usingWhen(
Transaction.beginTransaction(), // publihser(리소스) 생성
transaction -> transaction.insertRows(Flux.just("A", "B", "C")), // 리소스로부터 데이터 흭득(생성)
Transaction::commit, // when complete
Transaction::rollback); // when error
request.subscribe(
d -> System.out.println("onNext " + d),
e -> System.err.println("onError " + e),
() -> System.out.println("complete"));
Thread.sleep(1000);
}
}
class Transaction {
private static final Random rnd = new Random();
private final int id;
public Transaction(int id) {
this.id = id;
System.out.println("transaction created, id:" + id);
}
public static Mono<Transaction> beginTransaction() {
return Mono.defer(() -> Mono.just(new Transaction(rnd.nextInt(1000))));
}
public Flux<String> insertRows(Publisher<String> rows) {
return Flux.from(rows)
.delayElements(Duration.ofMillis(100))
.flatMap(r -> {
if (rnd.nextInt(10) < 3)
return Mono.error(new RuntimeException("Insert Error"));
else
return Mono.just(r);
});
}
public Mono<Void> commit() {
return Mono.defer(() -> {
System.out.println("commit: " + id);
if (rnd.nextBoolean())
return Mono.empty();
else
return Mono.error(new RuntimeException("Commit Error"));
});
}
public Mono<Void> rollback() {
return Mono.defer(() -> {
System.out.println("rollback: " + id);
if (rnd.nextBoolean())
return Mono.empty();
else
return Mono.error(new RuntimeException("Conn Error"));
});
}
}
// 출력결과
// onNext A
// onNext B
// onNext C
// commit: 507
// complete
Hot Stream, Cold Stream
리액터 연산자
웬만한 연산자들은 RxJava 의 Observable 과 중복됨
never, empty, error
public static void main(String[] args) {
Mono<String> stream1 = Mono.empty(); // 빈 인스턴스를 생성
Mono<String> stream2 = Mono.never(); // 완료, 오류에 대해서도 신호를 보내지 않음
Mono<String> stream3 = Mono.error(new IllegalArgumentException("unknown id"));
Flux<String> stream4 = Flux.empty();
Flux<String> stream5 = Flux.never();
Flux<String> stream6 = Flux.error(new IllegalArgumentException("unknown id"));
}
then, thenEmpty, thenMany
상위 스트림이 완료될 때 같이 완료되며 그림처럼 최종 Subscriber
에게 반환되는 값을 대체해버린다.
doOnEach, materialize, dematerialize
materialize: 구제화 하다
RxJava
의 Notification
객체처럼 Reactor
에도 Signal
객체가 있다.
public static void main(String[] args) {
// Consumer<? super Signal<T>> signalConsumer
Flux.just(1, 2, 3).doOnEach(s -> {
if (s.isOnNext())
System.out.println(s.get());
else if (s.isOnError())
System.err.println(s.getThrowable());
if (s.isOnComplete())
System.out.println("complete");
}).subscribe();
}
// 출력결과
// 1
// 2
// 3
// complete
때로는 발행 데이터를 일반요소가 아닌 Signal
객체가 더 유용할 수 있다.
그럴때 materialize
연산자를 사용하면 발행 요소를 Signal
로 변경 가능하다.
반대로 Signal
을 일반 발행요소로 변경하려면 dematerialize
를 사용하면 된다.
public static void main(String[] args) {
Flux.just(1, 2, 3).materialize()
.subscribe(s -> {
if (s.isOnNext())
System.out.println(s.get());
else if (s.isOnError())
System.err.println(s.getThrowable());
if (s.isOnComplete())
System.out.println("complete");
});
}
cache, replay
RxJava
의 replay
와 같은 메서드로 버퍼크기, 시간값을 사용해 캐시 데이터 축소가 가능함
transform, compose
public static void main(String[] args) {
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1)
return f.filter(color -> !color.equals("orange")).map(String::toUpperCase);
else
return f.filter(color -> !color.equals("purple")).map(String::toUpperCase);
};
Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.transform(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber1: " + d));
composedFlux.subscribe(d -> System.out.println("Subscriber2: " + d));
}
// 출력결과
// Subscriber1: BLUE
// Subscriber1: GREEN
// Subscriber1: PURPLE
// Subscriber2: BLUE
// Subscriber2: GREEN
// Subscriber2: PURPLE
orange
문자열은 두번의 subscribe
과정에서 모두 filter
처리되고 BLUE, GREEN, PURPLE
이 2번 반복하여 출력된다.
ai.incrementAndGet()
이 한번 호출된다는 뜻
map
의 경우 모든 각 발행 데이터가 해당 람다식을 거쳐가지만 transform
의 경우 방식이라 한번 호출되어 스트림 자체를 수정하고 끝난다.
compose
의 경우 transform
에 defer
를 적용한 것 과 같다.
public static void main(String[] args) {
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
if (ai.incrementAndGet() == 1)
return f.filter(color -> !color.equals("orange")).map(String::toUpperCase);
else
return f.filter(color -> !color.equals("purple")).map(String::toUpperCase);
};
Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.compose(filterAndMap);
composedFlux.subscribe(d -> System.out.println("Subscriber1: " + d));
composedFlux.subscribe(d -> System.out.println("Subscriber2: " + d));
}
// 출력결과
// Subscriber1: BLUE
// Subscriber1: GREEN
// Subscriber1: PURPLE
// Subscriber2: BLUE
// Subscriber2: GREEN
// Subscriber2: ORANGE
transform
과 같이 스트림을 수정하지만 subscribe 될 때 마다 해당 스트림이 수정된다.
orange
는 첫번째 subscribe
과정에서 filter
처리 두번째에는 처리된다.
그리고 두번째 subscribe
과정에선 purple
이 filter
처리된다.
as
의 경우transform
과 같이 스트림 자체를 변경하도록 하지만 반환타입이 정해져 있지 않아 Flux(스트림) 에서 Flux(스트림) 으로 변경할 수 도 있고 아예 다른 객체로 변경할 수 도 있다.
block, blockFirst, blockLast
block
은 Mono
클래스에서만 제공하는 연산자로 발행데이터를 return
한다.
첫 데이터 발행 직후 cancel
로 더이상을 발행을 종료하고 발행데이터를 return
한다.
Publisher
가 complete
될 때 마지막 발행 데이터만 수신받거나 데이터가 발행되지 않고 complete
된 경우 null
데이터 수신
then
Mono, Flux
발행자가 complete
될 경우 then
에 설정된 데이터를 발행하는 새로운 발행자를 만든다.
public static void main(String[] args) throws InterruptedException {
Mono.just(1)
.map(i -> {
System.out.println(i);
return i;
})
.then(Mono.just(2))
.subscribe(System.out::println);
}
// 출력결과
// 1
// 2
리액터 에러처리
onErrorReturn
onErrorResume, retryBackOff
retryBackOff
내부 구현을 보면 retryWhen
에 Retry.backoff(long, Duration)
설정을 넘기는 것과 같음
그림을 보면 실패시 재구독 하는 텀이 두배로 늘어난 것을 볼 수 있다. 고정된 시간으로 변경하고 싶다면
retryWhen
과reactor.util.retry.Retry
클래스의fixedDelay
사용
public static void main(String[] args) throws InterruptedException {
Flux.just("user-1")
.flatMap(userId -> recommendedBooks(userId)
.doOnError(e -> System.err.println(e.getMessage())) // recommendedBooks 에서 발생하는 에러리스너
.retryBackoff(5, Duration.ofMillis(100)) // 재시도 횟수, 지연시간 지정
.timeout(Duration.ofMillis(500)) // 재시도 후에도 0.5 초 안에 결과가 없다면 java.util.concurrent.TimeoutException 발생
.onErrorResume(e -> Flux.just("The Martian")))
.subscribe(
d -> System.out.println("onNext " + d),
e -> System.err.println("onError " + e),
() -> System.out.println("complete"));
Thread.sleep(1000);
}
public static Flux<String> recommendedBooks(String userId) {
return Flux.defer(() -> {
if (rnd.nextInt(10) < 7) // 70% 확률로 실패
return Flux.<String>error(new RuntimeException("Err")).delaySequence(Duration.ofMillis(100));
else
return Flux.just("Blue Mars", "The Expanse").delaySequence(Duration.ofMillis(50));
}).doOnSubscribe(s -> System.out.println("Request from " + userId));
}
timeout
안에 재시도마저 실패한다면 onErrorResume
에 지정해둔 The Martian
데이터가 발행된다.
리액터 수명주기
Flux.fromArray(new Integer[]{1, 2, 30, 40})
.map(String::valueOf)
.filter(s -> s.length() > 1);
조립 단계
위와 같은 Flux
가 생성될 경우 사실 내부코드 안으로 들어가 실행되는 코드를 조합하면 아래와 같다.
FluxFilter(
FluxMap(
FluxArray(1, 2, 30, 40),
...), // map Function
...) // filter Function
각 연사자 호출마다 새로운 Flux 클래스가 생성되며
가장 밑에서 호출한 연산자가 가장 위의 블록으로 자리잡는다
만약 모니터링, 로깅, 디버깅
등을 위한 Flux 연산자
호출시 감시하고 싶은 Flux 클래스
아래부분에 추가해야 한다.
이런점 때문에
Publisher
를 합치는concat
메서도 단일Publisher
를 사용해 여러번 호출하는 것 보다Iterable
형식의Publisher
배열을 전달하는 것이 성능 효율적이다.
구독 단계
subscribe()
메서드 호출하는 순간 해당 체인(블록구조) 에 대한 구독 프로세스가 실행된다.
대부분의 Flux
클래스 내부에 Subscriber
클래스도 정의되어 있다.
ArraySubscriber(
MapSubscriber(
FilterSubscriber(subscriber) // 정의한 구독메서드
)
)
구독단계에선 작성된 Flux 연산자
순서대로 데이터가 발행되고 처리되어야 하기 때문에 정방향이다.
구독단계에서도 시간이 많이 필요한 연산의 경우 스케줄러를 통해 멀티 스레드로 동작시켜 성능 향상 가능
런타임
실해 발행자와 구독자 사이 신호가 교환되는 상태(onSubscriber, request
)
구독단계에서 구독자는 스트림을 거슬러 올라가 Subscriber
객체를 생성하고 onSubscribe
메서드를 호출하게 된다.
당연히 배압을 위해 Subscription
생성시에는 구독단계에서 생성된 Subscriber
가 연결된다.
MapSubscriber.onSubscribe(new ArraySubscription(ArraySubscriber, ...)) {
FilterSubscriber.onSubscribe(new MapSubscription(MapSubscriber, ...)) {
Subscriber.onSubscribe(new FilterSubscription(FilterSubscriber, ...)) {
// onSubscribe 시 실행할 코드
FilterSubscription.request(N);
}
}
}
Reactor
에 구현된 Subscriber
클래스는 대부분 구독자 겸 발행자 역할을 하는 Processor
역할을 수행하기에 자신이 데이터를 발행받아 처리하고 하위 구독자에게 다시 발행한다.
FilterSubscription.request
가 호출되면 FilterSubscriber.request
가 호출되고 FilterSubscriber.request
는 저장된 상위 MapSubscription.request
를 호출한다.
MapSubscription.request
가 호출되면 MapSubscriber.request
가 호출되고 MapSubscriber.request
는 저장된 상위 ArraySubscription.request
를 호출한다.
ArraySubscription.request
는 저장된 ArraySubscriber.onNext
를 사용해 데이터 발행을 시작한다.
filter
에서 요청한 request
가 map, array
에게도 전달되고
array
에서 발행한 onNext
가 map, filter
에게 전달된다.
데이터 발행이 시작되면 저장된 Subscriber
체인에 따라 순서대로 사용자가 작성한 Subscriber
까지 데이터가 내려오게 된다.
// 의사 코드
FilterSubscription.request ->
FilterSubscriber.request ->
MapSubscription.request ->
MapSubscriber.request ->
ArraySubscription.request ->
ArraySubscriber.request ->
...
ArraySubscriber.onNext ->
MapSubscriber.onNext ->
FilterSubscriber.onNext ->
Subscriber.onNext (정의한 구독자)
런타임에선 데이터 소스로부터 request(N)
메서드를 통해 신호 교환량을 조절하여 성능 향상이 가능
리액터 스케줄러
Scheduler
클래스에는 대표적인 2가지 메서드 schedule
, createWorker
가 있다.
public interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Worker createWorker();
...
...
interface Worker extends Disposable {
Disposable schedule(Runnable task);
...
}
}
schedule
메서드를 사용하면 Runnable
작업 예약, createWorker
메서드를 사용하면 Runnable
작업 예약이 가능한 Worker
인스턴스를 생성
Scheduler
, Worker
둘다 Runnable
클래스를 실행시키는 메서드를 가지고 있고
Scheduler
는 스레드풀 Worker
는 스레드로 보면 된다.
Scheduler
의 대표적인 구현클래스는 아래 3종류
SingleScheduler
Scheduler.single()
메서드로 호출, 한개의 전용 워커를 예약하여 사용ParallelScheduler
Scheduler.parallel()
메서드로 호출, CPU코어 수 만큼의 워커를 예약하여 사용ElasticScheduler
Scheduler.elastic()
메서드로 호출, 동적으로 워커 생성 및 예약하여 사용,I/O, DB 쿼리
등에 적합함.BoundedElasticScheduler
Schedulers.boundedElastic()
메서드로 호출,Reactor 3.4.0
이후부터 사용,elastic
스케줄러를 대체한다.
제한된 스레드풀을 사용하여elastic
에서 발생하는 무한 스레드 생성 문제를 해결한다.
publishOn, subscribeOn
RxJava 의
observeOn
,subscribeOn
과 같다.
그림처럼 발행 데이터 생성 후 실행(런타임-구독자에게 도달하는 과정) 일부를 지정된 워커(스레드)로 이동하여 실행시킨다
주황색 부분이 별도의 워커(스레드)에서 동작하는 부분이다.
public static void main(String[] args) {
Flux.range(0, 100)
.map(String::valueOf)
.filter(s -> s.length() > 1)
.publishOn(schduler)
.map(s->calculateHash(s))
.map(s->doBusinessLogic(s))
.subscribe();
}
public static String calculateHash(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
public static String doBusinessLogic(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
실질적으로는 publishOn
호출 이전의 연산 결과를 큐에 집어넣고
호출 이후의 연산자들은 큐에서 발행 데이터를 하나씩 빼어 최종적으로 구독자에게 전달한다.
subscribeOn
의 경우 데이터 발행자체를 워커 풀에서 여러 워커들이 발행하도록 하는 방법이다.
subscribeOn
데이터 발행 스레드는 첫 지정시 고정되며 다시호출해도 무시되지만
publishOn
은 발행된 데이터를 조작하는 스레드로 계속 추가할 수 있다.
리액터 컨텍스트
일반적인 자바 프레임워크를 사용하면 사용자 접근시 스레드가 하나 생성되고 반환값을 전달 할 때 까지 해당 스레드가 유지된다.
해당 스레드가 유지되는 동한 ThreadLocal
이라는 저장공간에 공유하여 사용해야할 변수(컨텍스트)들을 저장해 두고 사용한다. 다른 스레드로 스위칭 될 때에도 ThreadLocal
을 이동 시킬 수 있다.
대다수의 프레임워크가 로그인 정보, 요청 정보 등을 ThreadLocal
에 저장해두고 수시로 사용하는데
비동기 처리방식을 사용하면 ThreadLocal
를 사용할 수 있는 구간이 한정적이다.
Flux, Mono
의 경우 데이터 발행, 처리, 구독 단계 에서 수많은 스레드가 같이 동작함으로 ThreadLocal
대신 리액터 Context
를 사용해 리액터 수명주기 안에서 생성되는 스레드에게 저장공간을 토스한다.
public static void main(String[] args) {
Object value = Flux.range(0, 10)
.flatMap(k ->
// Mono.subscriberContext 리액터 Context 접근 가능
Mono.subscriberContext().doOnNext(context -> {
Map map = context.get("randoms");
map.put(k, new Random(k).nextGaussian());
}).thenReturn(k))
.publishOn(Schedulers.parallel())
.flatMap(k ->
Mono.subscriberContext()
.map(context -> {
Map map = context.get("randoms");
return map.get(k);
}))
// 리액터 Context 에 데이터 저장
.subscriberContext(context -> context.put("randoms", new HashMap<>()))
.blockLast(); // 마지막 값만 흭득
System.out.println("value:" + value); // value:0.7743029489485066
}
Flux, Mono 수명주기-조립단계
에서 보았듯이 가장 밑에 있는 리액터 연산자
가 최상위 블록 객체가 되기 때문에 context
를 저장하는 subscriberContext
메서드는 아래 부분에 작성되야 한다.
public interface CoreSubscriber<T> extends Subscriber<T> {
default Context currentContext(){
return Context.empty(); // return new Context instance
}
@Override
void onSubscribe(Subscription s);
}
리액터에는 CoreSubscriber
라는 중요한 인터페이스가 있는데 대부분의 Subscriber
들이 이 인터페이스를 구현하며
Subscriber
생성자에서 상위 Subscribe
의 currentContext
메서드를 호출해 자신의 필드에 저장한다.
이런 방식으로 Subscriber
간에 컨텍스트를 연동한다.
또한 구현되어 사용되는 Context
는 immutable
객체로 변경될때 마다 새로운 객체로 변경되기 때문에 동시에 여러 데이터를 추가,삭제,변경하고 싶다면 위처럼 데이터 저장용(random map
) 객체를 Context
에 넣어 사용해야 한다.
또한 Mono.subscriberContext
메서드를 통해 컨텍스트를 가져오는데 이는 스트림의 많은 연결중 하나의 연결에 대한 컨텍스트를 뜻한다.
리액터 최적화
리액터 수명주기, 연산자 조합에 따라 성능 효율이 달라지는데 리액터 프로젝트 버전 3이 되면서 최적화 방안이 추가됨
매크로 퓨전: 조립단계에서 비효율적인 연산자를 효율적인 연산자로 교체 하는 방법
public static void main(String[] args) throws InterruptedException {
Flux.just(1)
.publishOn(Schedulers.parallel())
.map(i -> i + 1)
.subscribe(i -> System.out.println("data:" + i)); // data:2
Thread.sleep(100);
}
단순히 데이터 한건 발행 후 +1 해서 출력하는 간단한 리액터 코드이다.
하지만 publishOn
이 설정되어 별도의 큐 시스템이 형성되어 멀티스레드에 안전한 코드까지 추가되는 오버헤드가 큰 작업으로 변하는데
매크로 퓨전은 이를 방지한다.
just
연산자는 FluxJust
를 생성하는데 아래처럼 생겼다.
final class FluxJust<T> extends Flux<T>
implements ScalarCallable<T>, Fuseable, SourceProducer<T> {
...
}
interface ScalarCallable<T> extends Callable<T> { }
리액터 생명주기 조립단계에서 publishOn
은 ScalarCallable
를 구현한 Flux
객체를 만나게 되면 아래 코드처럼 실행된다.
ScalarCallable 구현 클래스로는 FluxJust, FluxError, FluxEmpty
등이 있다.
final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
if (this instanceof Callable) {
if (this instanceof Fuseable.ScalarCallable) {
Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
}
...
}
return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}
큐와 멀티스레드 안전 코드를 사용하는 FluxPublishOn
가 아닌 FluxSubscribeOnValue
를 사용한다.
FluxSubscribeOnValue
을 사용하면 스트림을 큐처럼 간주해 데이터를 땡겨올 수 있다.
ver 3 에선 위 방식으로 개발자 실수로 인한 오버헤드가 일어나지 않도록 리액터 개발진들이 최적화 코드를 작성해 두었다.