java - 리액터!

Reactor Project

참고
https://kouzie.github.io/java/java-리액터/# https://kouzie.github.io/java/java-RxJava,-Reactive-Stream/
https://projectreactor.io/docs/core/release/api/overview-summary.html

Pivotal 에서 만든 리엑티브 라이브러리, 마찬가지로 Reactive Stream 스펙을 따르며 사용법이 RxJava 와 굉장히 유사하다.

아무래도 Spring 개발업체에서 만든 라이브러리이다 보니 Spring 사용시 많은 리액티브 스타터 의존패키지가 해당 라이브러리를 같이 사용한다.

Flux, Mono

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html
https://projectreactor.io/docs/core/release/reference/#about-doc

마찬가지로 Reactive Stream 의 기술 스펙을 구현하였으며
Publisher 구현체로 FluxMono 가 있다.

package reactor.core.publisher;

import org.reactivestreams.Publisher;

public abstract class Flux<T> implements Publisher<T> {...}

public abstract class Mono<T> implements Publisher<T> {...}

Flux 는 사용법이 RxJavaObservable 과 매우 비슷하다.

public static void main(String[] args) {
    Flux<String> stream1 = Flux.just("hello", "world");
    Flux<Integer> stream2 = Flux.fromArray(new Integer[]{1, 2, 3});
    Flux<Integer> stream3 = Flux.fromIterable(Arrays.asList(4, 5, 6));
    Flux<Integer> stream4 = Flux.range(2020, 9); // 2020 ~ 2028
}

연속된 스트림을 리액티브하게 반환한다.

Mono 는 사용법이 RxJavaMaybe 와 비슷하다.

public static void main(String[] args) {
    Mono<String> stream1 = Mono.just("hello");
    Mono<Integer> stream2 = Mono.justOrEmpty(null);
    Mono<Integer> stream3 = Mono.justOrEmpty(Optional.empty());
}

최대 한개 요소의 스트림을 반환한다.

FluxMono 는 서로 쉽게 변환 가능하다.

Flux 는 연속된 스트림을 리스트로 변환하여 하나의 요소로 변경, Mono 로 반환할 수 있고
Mono 는 하나의 스트림을 연속된 스트림처럼 감싸 반환할 수 있다.

Mono 의 경우 http, db 질의 반환값으로 사용하기 적합하다

Subscriber

public final Disposable subscribe() {...}

public final Disposable subscribe(Consumer<? super T> consumer) {...}

public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) {...}

public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer) {...}

public final Disposable subscribe(
        @Nullable Consumer<? super T> consumer,
        @Nullable Consumer<? super Throwable> errorConsumer,
        @Nullable Runnable completeConsumer,
        @Nullable Consumer<? super Subscription> subscriptionConsumer) {...}

@Override
public final void subscribe(Subscriber<? super T> actual) {...}

RxJava 와 패키지명만 다를뿐 subscribe 메서드 구조, Disposable 인터페이스를 반환하는 것이 동일하다.

public static void main(String[] args) {
    Flux.just(1, 2, 3).subscribe(
        System.out::println, // consumer
        System.err::println, // error consumer
        ()-> System.out.println("complete")); // complete consumer
}
// 출력결과
// 1
// 2
// 3
// complete

FluxRxJavaObservable 로 변경해도 그대로 동작한다, 그만큼 두 라이브러리가 유사함.

Observable 과 다르게 Flux 는 배압처리까지 가능한 Reactive Stream 을 계승한 클래스이다.
아래처럼 subscription 컨슈머 메서드를 작성해 배압처리가 가능

public static void main(String[] args) {
    Flux.just(1, 2, 3).subscribe(
        System.out::println, // consumer
        System.err::println, // error consumer
        () -> System.out.println("complete"), // complete consumer
        subscription -> {
            subscription.request(2);
            subscription.cancel();
    });
}
// 출력결과
// 1
// 2

FluxRxJavaFlowable 로 변경해도 그대로 동작한다, 그만큼 두 라이브러리가 유사함.

BaseSubscriber

FluxSubscriber 구현시 TCK 조건을 만족하는 Subscriber 작성은 까다롭다.

Reactor 라이브러리에선 기본적인 요구조건을 만족시킨 Subscriber 를 미리 정의해두었다.

public abstract class BaseSubscriber<T> implements 
        CoreSubscriber<T>, Subscription, Disposable {
    ...
}

BaseSubscriber 를 사용하면 TCK 를 만족하는 Subscriber 를 쉽게 구현할 수 있다.

public class MySubscriber<T> extends BaseSubscriber<T> {

    public void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1);
    }

    public void hookOnNext(T value) {
        System.out.println(value);
        request(1);
    }
}

리액터 객체 생성

push, create

image9

image10

둘다 Flux 생성자로 FluxSink 라는 클래스를 사용해 발행할 데이터를 지정한다.

public static void main(String[] args) {
    Flux.push(sink -> IntStream.range(2000, 3000).forEach(sink::next))
            .subscribe(e -> System.out.println(e));

    Flux.create(sink -> IntStream.range(2000, 3000).forEach(sink::next))
            .subscribe(e -> System.out.println(e));
}

FluxSink.next 메서드를 통해 발행 데이터를 구독자에게 발행

둘의 사용법은 같으나 FluxSink 가 멀티 스레드, 싱글 스레드로 생성는지 차이이다.
데이터 발행 코드에서 멀티 스레드 오류가 발생할 수 있다면 push 를 사용하자.

generate, using, usingWhen

image11

generate 는 발행되는 데이터를 연계해 새로운 발행 데이터를 생성 및 관리할 수 있다.

위 그림처럼 조건에 따라 1씩 증가하는 발행 데이터를 생성하고 sink 를 통해 구독자에게 전달하면서
기존 발행 데이터를 사용해 지속적으로 새로운 발행 데이터를 만들어 낸다.

public static void main(String[] args) {
    Flux.generate(() -> Tuples.of(0, 1), (state, sink) -> {
        System.out.println(state.getT2());
        sink.next(state.getT2()); //onNext 신호 발생
        int newValue = state.getT1() + state.getT2();
        return Tuples.of(state.getT2(), newValue); // 새로운 시퀀스 전달 반복
    }).delayElements(Duration.ofMillis(1))
        .take(7)
        .subscribe(e -> System.out.println(e)); // sink.next 로부터 수신
        Thread.sleep(1000);
}
// 출력결과
// 1
// 1
// 2
// 3
// 5
// 8
// 13

간단한 피보나치 수열을 발행하는 Flux 인스턴스 생성

image12

using 은 리소스를 제공하는 인스턴스를 통해 Flux 객체를 생성할 수 있다.

public class Main {
    public static void main(String[] args) {
        Flux<String> flux = Flux.using(
                Connection::newConnection, // publihser(리소스) 생성 
                connection -> Flux.fromIterable(connection.getData()), // 리소스로부터 데이터 흭득(생성)
                Connection::close);

        flux.subscribe(data -> System.out.println("data:" + data),
                e -> System.err.println("error:" + e.getMessage()),
                () -> System.out.println("complete"));

        flux.subscribe(data -> System.out.println("data:" + data),
                e -> System.err.println("error:" + e.getMessage()),
                () -> System.out.println("complete"));
    }
}

class Connection implements AutoCloseable {
    private final Random rnd = new Random();
    private Connection() {}

    public Iterable<String> getData() {
        if (rnd.nextInt(10) < 3) // 10 분의 3 확률로 실패 
            throw new RuntimeException("Connect error");
        return Arrays.asList("one", "two", "three");
    }

    public void close() {
        System.out.println("Connection Closed");
    }

    public static Connection newConnection() {
        System.out.println("Connection Created");
        return new Connection();
    }
}
// 출력결과
// Connection Created
// data:one
// data:two
// data:three
// Connection Closed
// complete
// Connection Created
// data:one
// data:two
// data:three
// Connection Closed
// complete

그림처럼 구독할 때 마다 resource객체(Connection) 을 생성하고 리소스 내부의 데이터 발행 메서드를 통해 구독자엑 데이터를 전송한다.

image13

usingWhen 의 경우 using 에서 성공, 실패에 대한 조치까지 매개변수로 지정 가능

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flux<String> request = Flux.usingWhen(
                Transaction.beginTransaction(), // publihser(리소스) 생성 
                transaction -> transaction.insertRows(Flux.just("A", "B", "C")), // 리소스로부터 데이터 흭득(생성)
                Transaction::commit, // when complete
                Transaction::rollback); // when error
        request.subscribe(
                d -> System.out.println("onNext " + d),
                e -> System.err.println("onError " + e),
                () -> System.out.println("complete"));
        Thread.sleep(1000);
    }
}

class Transaction {
    private static final Random rnd = new Random();
    private final int id;

    public Transaction(int id) {
        this.id = id;
        System.out.println("transaction created, id:" + id);
    }

    public static Mono<Transaction> beginTransaction() {
        return Mono.defer(() -> Mono.just(new Transaction(rnd.nextInt(1000))));
    }

    public Flux<String> insertRows(Publisher<String> rows) {
        return Flux.from(rows)
            .delayElements(Duration.ofMillis(100))
            .flatMap(r -> {
                if (rnd.nextInt(10) < 3)
                    return Mono.error(new RuntimeException("Insert Error"));
                else
                    return Mono.just(r);
            });
    }

    public Mono<Void> commit() {
        return Mono.defer(() -> {
            System.out.println("commit: " + id);
            if (rnd.nextBoolean())
                return Mono.empty();
            else
                return Mono.error(new RuntimeException("Commit Error"));
        });
    }

    public Mono<Void> rollback() {
        return Mono.defer(() -> {
            System.out.println("rollback: " + id);
            if (rnd.nextBoolean())
                return Mono.empty();
            else
                return Mono.error(new RuntimeException("Conn Error"));
        });
    }
}
// 출력결과
// onNext A
// onNext B
// onNext C
// commit: 507
// complete

Hot Stream, Cold Stream

리액터 연산자

웬만한 연산자들은 RxJava 의 Observable 과 중복됨

never, empty, error

image2

image1

image3

public static void main(String[] args) {
    Mono<String> stream1 = Mono.empty(); // 빈 인스턴스를 생성
    Mono<String> stream2 = Mono.never(); // 완료, 오류에 대해서도 신호를 보내지 않음
    Mono<String> stream3 = Mono.error(new IllegalArgumentException("unknown id"));
    Flux<String> stream4 = Flux.empty();
    Flux<String> stream5 = Flux.never();
    Flux<String> stream6 = Flux.error(new IllegalArgumentException("unknown id"));
}

then, thenEmpty, thenMany

image4

image5

image6

상위 스트림이 완료될 때 같이 완료되며 그림처럼 최종 Subscriber 에게 반환되는 값을 대체해버린다.

doOnEach, materialize, dematerialize

materialize: 구제화 하다

image7

image8

RxJavaNotification 객체처럼 Reactor 에도 Signal 객체가 있다.

public static void main(String[] args) {
    // Consumer<? super Signal<T>> signalConsumer
    Flux.just(1, 2, 3).doOnEach(s -> {
        if (s.isOnNext())
            System.out.println(s.get());
        else if (s.isOnError())
            System.err.println(s.getThrowable());
        if (s.isOnComplete())
            System.out.println("complete");
    }).subscribe();
}
// 출력결과
// 1
// 2
// 3
// complete

때로는 발행 데이터를 일반요소가 아닌 Signal 객체가 더 유용할 수 있다.
그럴때 materialize 연산자를 사용하면 발행 요소를 Signal 로 변경 가능하다.

반대로 Signal 을 일반 발행요소로 변경하려면 dematerialize 를 사용하면 된다.

public static void main(String[] args) {
    Flux.just(1, 2, 3).materialize()
        .subscribe(s -> {
            if (s.isOnNext())
                System.out.println(s.get());
            else if (s.isOnError())
                System.err.println(s.getThrowable());
            if (s.isOnComplete())
                System.out.println("complete");
    });
}

cache, replay

image16

RxJavareplay 와 같은 메서드로 버퍼크기, 시간값을 사용해 캐시 데이터 축소가 가능함

transform, compose

image10

public static void main(String[] args) {
    AtomicInteger ai = new AtomicInteger();
    Function<Flux<String>, Flux<String>> filterAndMap = f -> {
        if (ai.incrementAndGet() == 1)
            return f.filter(color -> !color.equals("orange")).map(String::toUpperCase);
        else
            return f.filter(color -> !color.equals("purple")).map(String::toUpperCase);
    };
    Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .transform(filterAndMap);
    composedFlux.subscribe(d -> System.out.println("Subscriber1: " + d));
    composedFlux.subscribe(d -> System.out.println("Subscriber2: " + d));
}
// 출력결과  
// Subscriber1: BLUE
// Subscriber1: GREEN
// Subscriber1: PURPLE
// Subscriber2: BLUE
// Subscriber2: GREEN
// Subscriber2: PURPLE

orange 문자열은 두번의 subscribe 과정에서 모두 filter 처리되고 BLUE, GREEN, PURPLE 이 2번 반복하여 출력된다.

ai.incrementAndGet() 이 한번 호출된다는 뜻

map 의 경우 모든 각 발행 데이터가 해당 람다식을 거쳐가지만 transform 의 경우 방식이라 한번 호출되어 스트림 자체를 수정하고 끝난다.

compose 의 경우 transformdefer 를 적용한 것 과 같다.

public static void main(String[] args) {
    AtomicInteger ai = new AtomicInteger();
    Function<Flux<String>, Flux<String>> filterAndMap = f -> {
        if (ai.incrementAndGet() == 1)
            return f.filter(color -> !color.equals("orange")).map(String::toUpperCase);
        else
            return f.filter(color -> !color.equals("purple")).map(String::toUpperCase);
    };

    Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .compose(filterAndMap);
    composedFlux.subscribe(d -> System.out.println("Subscriber1: " + d));
    composedFlux.subscribe(d -> System.out.println("Subscriber2: " + d));
}
// 출력결과
// Subscriber1: BLUE
// Subscriber1: GREEN
// Subscriber1: PURPLE
// Subscriber2: BLUE
// Subscriber2: GREEN
// Subscriber2: ORANGE

transform 과 같이 스트림을 수정하지만 subscribe 될 때 마다 해당 스트림이 수정된다.

orange 는 첫번째 subscribe 과정에서 filter 처리 두번째에는 처리된다.
그리고 두번째 subscribe 과정에선 purplefilter 처리된다.

as 의 경우 transform 과 같이 스트림 자체를 변경하도록 하지만 반환타입이 정해져 있지 않아 Flux(스트림) 에서 Flux(스트림) 으로 변경할 수 도 있고 아예 다른 객체로 변경할 수 도 있다.

block, blockFirst, blockLast

image22

blockMono 클래스에서만 제공하는 연산자로 발행데이터를 return 한다.

image19

첫 데이터 발행 직후 cancel 로 더이상을 발행을 종료하고 발행데이터를 return 한다.

image20

Publishercomplete 될 때 마지막 발행 데이터만 수신받거나 데이터가 발행되지 않고 complete 된 경우 null 데이터 수신

then

image23

Mono, Flux 발행자가 complete 될 경우 then 에 설정된 데이터를 발행하는 새로운 발행자를 만든다.

public static void main(String[] args) throws InterruptedException {
    Mono.just(1)
        .map(i -> {
            System.out.println(i);
            return i;
        })
        .then(Mono.just(2))
        .subscribe(System.out::println);
}
// 출력결과
// 1
// 2

리액터 에러처리

onErrorReturn

image14

onErrorResume, retryBackOff

image15

image14

retryBackOff 내부 구현을 보면 retryWhenRetry.backoff(long, Duration) 설정을 넘기는 것과 같음

그림을 보면 실패시 재구독 하는 텀이 두배로 늘어난 것을 볼 수 있다. 고정된 시간으로 변경하고 싶다면 retryWhenreactor.util.retry.Retry 클래스의 fixedDelay 사용

public static void main(String[] args) throws InterruptedException {
    Flux.just("user-1")
        .flatMap(userId -> recommendedBooks(userId)
            .doOnError(e -> System.err.println(e.getMessage())) // recommendedBooks 에서 발생하는 에러리스너
            .retryBackoff(5, Duration.ofMillis(100)) // 재시도 횟수, 지연시간 지정
            .timeout(Duration.ofMillis(500)) // 재시도 후에도 0.5 초 안에 결과가 없다면 java.util.concurrent.TimeoutException 발생
            .onErrorResume(e -> Flux.just("The Martian")))
        .subscribe(
            d -> System.out.println("onNext " + d),
            e -> System.err.println("onError " + e),
            () -> System.out.println("complete"));
    Thread.sleep(1000);
}

public static Flux<String> recommendedBooks(String userId) {
    return Flux.defer(() -> {
        if (rnd.nextInt(10) < 7) // 70% 확률로 실패
            return Flux.<String>error(new RuntimeException("Err")).delaySequence(Duration.ofMillis(100));
        else
            return Flux.just("Blue Mars", "The Expanse").delaySequence(Duration.ofMillis(50));
    }).doOnSubscribe(s -> System.out.println("Request from " + userId));
}

timeout 안에 재시도마저 실패한다면 onErrorResume 에 지정해둔 The Martian 데이터가 발행된다.

리액터 수명주기

Flux.fromArray(new Integer[]{1, 2, 30, 40})
    .map(String::valueOf)
    .filter(s -> s.length() > 1);

조립 단계

위와 같은 Flux 가 생성될 경우 사실 내부코드 안으로 들어가 실행되는 코드를 조합하면 아래와 같다.

FluxFilter(
    FluxMap(
        FluxArray(1, 2, 30, 40), 
        ...), // map Function 
...) // filter Function

각 연사자 호출마다 새로운 Flux 클래스가 생성되며
가장 밑에서 호출한 연산자가 가장 위의 블록으로 자리잡는다

만약 모니터링, 로깅, 디버깅 등을 위한 Flux 연산자 호출시 감시하고 싶은 Flux 클래스 아래부분에 추가해야 한다.

이런점 때문에 Publisher 를 합치는 concat 메서도 단일 Publisher 를 사용해 여러번 호출하는 것 보다 Iterable 형식의 Publisher 배열을 전달하는 것이 성능 효율적이다.

구독 단계

subscribe() 메서드 호출하는 순간 해당 체인(블록구조) 에 대한 구독 프로세스가 실행된다.
대부분의 Flux 클래스 내부에 Subscriber 클래스도 정의되어 있다.

ArraySubscriber(
    MapSubscriber(
        FilterSubscriber(subscriber) // 정의한 구독메서드 
    )
)

구독단계에선 작성된 Flux 연산자 순서대로 데이터가 발행되고 처리되어야 하기 때문에 정방향이다.

구독단계에서도 시간이 많이 필요한 연산의 경우 스케줄러를 통해 멀티 스레드로 동작시켜 성능 향상 가능

런타임

실해 발행자와 구독자 사이 신호가 교환되는 상태(onSubscriber, request)

구독단계에서 구독자는 스트림을 거슬러 올라가 Subscriber 객체를 생성하고 onSubscribe 메서드를 호출하게 된다.
당연히 배압을 위해 Subscription 생성시에는 구독단계에서 생성된 Subscriber 가 연결된다.

MapSubscriber.onSubscribe(new ArraySubscription(ArraySubscriber, ...)) {
    FilterSubscriber.onSubscribe(new MapSubscription(MapSubscriber, ...)) {
        Subscriber.onSubscribe(new FilterSubscription(FilterSubscriber, ...)) {
            // onSubscribe 시 실행할 코드 
            FilterSubscription.request(N);
        }
    }
}

Reactor 에 구현된 Subscriber 클래스는 대부분 구독자 겸 발행자 역할을 하는 Processor 역할을 수행하기에 자신이 데이터를 발행받아 처리하고 하위 구독자에게 다시 발행한다.

FilterSubscription.request 가 호출되면 FilterSubscriber.request 가 호출되고 FilterSubscriber.request 는 저장된 상위 MapSubscription.request 를 호출한다.

MapSubscription.request 가 호출되면 MapSubscriber.request 가 호출되고 MapSubscriber.request 는 저장된 상위 ArraySubscription.request 를 호출한다.

ArraySubscription.request 는 저장된 ArraySubscriber.onNext 를 사용해 데이터 발행을 시작한다.

filter 에서 요청한 requestmap, array 에게도 전달되고 array 에서 발행한 onNextmap, filter 에게 전달된다.

데이터 발행이 시작되면 저장된 Subscriber 체인에 따라 순서대로 사용자가 작성한 Subscriber 까지 데이터가 내려오게 된다.

// 의사 코드 
FilterSubscription.request -> 
FilterSubscriber.request -> 
    MapSubscription.request -> 
    MapSubscriber.request -> 
        ArraySubscription.request ->
        ArraySubscriber.request -> 
...
        ArraySubscriber.onNext -> 
    MapSubscriber.onNext -> 
FilterSubscriber.onNext -> 

Subscriber.onNext (정의한 구독자)

런타임에선 데이터 소스로부터 request(N) 메서드를 통해 신호 교환량을 조절하여 성능 향상이 가능

리액터 스케줄러

publishOn, subscribeOn

RxJava 의 observeOn, subscribeOn 과 같다.

image18

그림처럼 발행 데이터 생성 후 실행(런타임-구독자에게 도달하는 과정) 일부를 지정된 워커(스레드)로 이동하여 실행시킨다

주황색 부분이 별도의 워커(스레드)에서 동작하는 부분이다.

public static void main(String[] args) {
    Flux.range(0, 100)
        .map(String::valueOf)
        .filter(s -> s.length() > 1)

        .publishOn(schduler)

        .map(s->calculateHash(s))
        .map(s->doBusinessLogic(s))
        .subscribe();
}

public static String calculateHash(String s) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return s;
}

public static String doBusinessLogic(String s) {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return s;
}

실질적으로는 publishOn 호출 이전의 연산 결과를 큐에 집어넣고
호출 이후의 연산자들은 큐에서 발행 데이터를 하나씩 빼어 최종적으로 구독자에게 전달한다.

image21

subscribeOn 의 경우 데이터 발행자체를 워커 풀에서 여러 워커들이 발행하도록 하는 방법이다.

subscribeOn 데이터 발행 스레드는 첫 지정시 고정되며 다시호출해도 무시되지만
publishOn 은 발행된 데이터를 조작하는 스레드로 계속 추가할 수 있다.

스케줄러 종류

public interface Scheduler extends Disposable {
    Disposable schedule(Runnable task);
    Worker createWorker();
    ...
    ...
    interface Worker extends Disposable {
        Disposable schedule(Runnable task);
        ...
    }
}

Scheduler 클래스에는 대표적인 2가지 메서드 schedule, createWorker 가 있다. schedule 메서드를 사용하면 Runnable 작업 예약, createWorker 메서드를 사용하면 Runnable 작업 예약이 가능한 Worker 인스턴스를 생성

Scheduler, Worker 둘다 Runnable 클래스를 실행시키는 메서드를 가지고 있고
Scheduler 는 스레드풀 Worker 는 스레드로 보면 된다.

Scheduler 의 대표적인 구현클래스는 아래 3종류

  1. SingleScheduler - Scheduler.single() 메서드로 호출, 한개의 전용 워커를 예약하여 사용
  2. ParallelScheduler - Scheduler.parallel() 메서드로 호출, CPU코어 수 만큼의 워커를 예약하여 사용
  3. ElasticScheduler - Scheduler.elastic() 메서드로 호출, 동적으로 워커 생성 및 예약하여 사용, I/O, DB 쿼리 등에 적합함.

리액터 컨텍스트

일반적인 자바 프레임워크를 사용하면 사용자 접근시 스레드가 하나 생성되고 반환값을 전달 할 때 까지 해당 스레드가 유지된다.

해당 스레드가 유지되는 동한 ThreadLocal 이라는 저장공간에 공유하여 사용해야할 변수(컨텍스트)들을 저장해 두고 사용한다. 다른 스레드로 스위칭 될 때에도 ThreadLocal 을 이동 시킬 수 있다.

대다수의 프레임워크가 로그인 정보, 요청 정보 등을 ThreadLocal 에 저장해두고 수시로 사용하는데
비동기 처리방식을 사용하면 ThreadLocal 를 사용할 수 있는 구간이 한정적이다.

Flux, Mono 의 경우 데이터 발행, 처리, 구독 단계 에서 수많은 스레드가 같이 동작함으로 ThreadLocal 대신 리액터 Context 를 사용해 리액터 수명주기 안에서 생성되는 스레드에게 저장공간을 토스한다.

public static void main(String[] args) {
    Object value = Flux.range(0, 10)
        .flatMap(k ->
            // Mono.subscriberContext 리액터 Context 접근 가능
            Mono.subscriberContext().doOnNext(context -> {
                Map map = context.get("randoms");
                map.put(k, new Random(k).nextGaussian());
            }).thenReturn(k))
        .publishOn(Schedulers.parallel())
        .flatMap(k ->
            Mono.subscriberContext()
                .map(context -> {
                    Map map = context.get("randoms");
                    return map.get(k);
                }))
        // 리액터 Context 에 데이터 저장
        .subscriberContext(context -> context.put("randoms", new HashMap<>())) 
        .blockLast(); // 마지막 값만 흭득
    System.out.println("value:" + value); // value:0.7743029489485066
}

Flux, Mono 수명주기-조립단계 에서 보았듯이 가장 밑에 있는 리액터 연산자가 최상위 블록 객체가 되기 때문에 context 를 저장하는 subscriberContext 메서드는 아래 부분에 작성되야 한다.

public interface CoreSubscriber<T> extends Subscriber<T> {
    default Context currentContext(){
        return Context.empty(); // return new Context instance
    }
    @Override
    void onSubscribe(Subscription s);
}

리액터에는 CoreSubscriber 라는 중요한 인터페이스가 있는데 대부분의 Subscriber 들이 이 인터페이스를 구현하며
Subscriber 생성자에서 상위 SubscribecurrentContext 메서드를 호출해 자신의 필드에 저장한다.

이런 방식으로 Subscriber 간에 컨텍스트를 연동한다.

또한 구현되어 사용되는 Contextimmutable 객체로 변경될때 마다 새로운 객체로 변경되기 때문에 동시에 여러 데이터를 추가,삭제,변경하고 싶다면 위처럼 데이터 저장용(random map) 객체를 Context 에 넣어 사용해야 한다.

또한 Mono.subscriberContext 메서드를 통해 컨텍스트를 가져오는데 이는 스트림의 많은 연결중 하나의 연결에 대한 컨텍스트를 뜻한다.

리액터 최적화

리액터 수명주기, 연산자 조합에 따라 성능 효율이 달라지는데 리액터 프로젝트 버전 3이 되면서 최적화 방안이 추가됨

매크로 퓨전: 조립단계에서 비효율적인 연산자를 효율적인 연산자로 교체 하는 방법

public static void main(String[] args) throws InterruptedException {
    Flux.just(1)
        .publishOn(Schedulers.parallel())
        .map(i -> i + 1)
        .subscribe(i -> System.out.println("data:" + i)); // data:2
    Thread.sleep(100);
}

단순히 데이터 한건 발행 후 +1 해서 출력하는 간단한 리액터 코드이다.

하지만 publishOn 이 설정되어 별도의 큐 시스템이 형성되어 멀티스레드에 안전한 코드까지 추가되는 오버헤드가 큰 작업으로 변하는데
매크로 퓨전은 이를 방지한다.

just 연산자는 FluxJust 를 생성하는데 아래처럼 생겼다.

final class FluxJust<T> extends Flux<T> 
        implements ScalarCallable<T>, Fuseable, SourceProducer<T> {
    ...
}

interface ScalarCallable<T> extends Callable<T> { }

리액터 생명주기 조립단계에서 publishOnScalarCallable 를 구현한 Flux 객체를 만나게 되면 아래 코드처럼 실행된다.
ScalarCallable 구현 클래스로는 FluxJust, FluxError, FluxEmpty 등이 있다.

final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
    if (this instanceof Callable) {
        if (this instanceof Fuseable.ScalarCallable) {
            Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
            return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
        }
        ...
    }
    return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
}

큐와 멀티스레드 안전 코드를 사용하는 FluxPublishOn 가 아닌 FluxSubscribeOnValue 를 사용한다.

FluxSubscribeOnValue 을 사용하면 스트림을 큐처럼 간주해 데이터를 땡겨올 수 있다.
ver 3 에선 위 방식으로 개발자 실수로 인한 오버헤드가 일어나지 않도록 리액터 개발진들이 최적화 코드를 작성해 두었다.

카테고리:

업데이트: