Skip to content

[spring webflux] cold, hot, backpressure, sinks, scheduler, context #54

@backtony

Description

@backtony

Reactor 개요

Reactor란?

Reactor는 스프링팀 주도하에 개발된 리액티브 스트림즈의 구현체로 webflux의 지반의 리액티브 애플리케이션을 제작하기 위한 핵심 역할을 한다. 리액티브 스트림즈 구현체인 Ractor는 리액티브 프로그래밍을 위한 라이브러리라고 정의할 수 있다.

Reactor는 다음과 같은 특징을 갖는다.

  • Reactive Streams
    • 리액티브 스트림즈 사양을 구현한 리액티브 라이브러리
  • non-blocking
  • java functional api
  • Flux, Mono
    • 둘다 publisher 타입
    • mono : 0 ~ 1개 emit
    • flux : 0 ~ N개 emit
  • well-suited for microservices
  • backpressure-ready network
    • publisher로부터 전달 받은 데이터를 처리하는 데 있어서 과부하가 걸리지 않도록 제어하는 backpressure 지원

Reactor 구성요소 익히기

public static void main(String[] args) {
    Flux<String> sequence = Flux.just("Hello", "Reactor");
    sequence.map(data -> data.toLowerCase())
            .subscribe(data -> System.out.println(data));
}
  • Flux는 publisher 역할
    • 2건을 반환하므로 Flux를 사용하고 Flux의 just operator를 사용하여 Flux 를 생성한다.
  • Reactor의 Operator는 Flux 또는 Mono를 반환하기 때문에 Operator 체인을 형성하여 또 다른 Operator를 연속적으로 호출할 수 있다.

public static void main(String[] args) {
    Mono
        .empty()
        .subscribe(
                // onNext
                none -> System.out.println("# emitted onNext signal"),
                // onError
                error -> {},
                // onComplete
                () -> System.out.println("# emitted onComplete signal")
        );
}

Mono publisher가 빈 데이터 소스를 반환했기 때문에 onComplete만 호출된다.


public static void main(String[] args) {
    Flux<String> flux =
            Mono.justOrEmpty("Steve")
                    .concatWith(Mono.justOrEmpty("Jobs"));
    flux.subscribe(System.out::println);
}

concatWith는 2개의 데이터 소스를 연결해준다. Mono publisher 두개를 붙였기 때문에 하나의 데이터 소스로 만들어진다. 하나의 데이터 소스로 만들어진다고 해서 하나의 문자열이 되는 것이 아니고 Flux.just("steve", "jobs") 로 되는 것이다. 하나의 데이터 소스가 2개의 요소를 가지기 때문에 Flux가 되는 것이다.


public static void main(String[] args) {
    Flux.concat(
                    Flux.just("Mercury", "Venus", "Earth"),
                    Flux.just("Mars", "Jupiter", "Saturn"),
                    Flux.just("Uranus", "Neptune", "Pluto"))
            .collectList()
            .subscribe(planets -> System.out.println(planets));
}
// 출력
["Mercury", "Venus", "Earth", "Mars", ... 나머지들]

concat은 여러 개의 데이터 소스를 하나로 연결해준다. 따라서 Flux.just("Mercury", "venuse", "earth", "mars"....) 이런 형식의 Flux를 만들게 된다. collectList는 Flux 요소들을 하나의 List로 만들어버리므로 Mono를 반환하게 된다.

Cold Sequence, Hot Sequence

  • Cold : 무언가를 새로 시작한다.
  • Hot : 무언가를 새로 시작하지 않는다.

Cold Sequence

Cold Sequence는 Subscriber가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence이다.

public static void main(String[] args) throws InterruptedException {

    Flux<String> coldFlux =
            Flux
                .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                .map(String::toLowerCase);

    coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
    System.out.println("----------------------------------------------------------------------");
    Thread.sleep(2000L);
    coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
}
// 출력
23:07:49.763 [main] INFO - # Subscriber1: korea
23:07:49.765 [main] INFO - # Subscriber1: japan
23:07:49.766 [main] INFO - # Subscriber1: chinese
----------------------------------------------------------------------
23:07:51.776 [main] INFO - # Subscriber2: korea
23:07:51.777 [main] INFO - # Subscriber2: japan
23:07:51.777 [main] INFO - # Subscriber2: chinese

같은 flux를 구독을 여러 번 해도 다른 타임라인으로 구성되어 같은 내용을 emit한다.

Hot Sequence

Hot Sequence는 구독이 발생한 시점 이전에 Publisher로부터 emit 된 데이터는 subscriber가 전달받지 못하고 구독이 발생한 시점 이후에 emit된 데이터만 전달받을 수 있다.

public static void main(String[] args) throws InterruptedException {
    String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};

    log.info("# Begin concert:");
    // 5명의 가수가 1초에 한명씩 출현해서 노래를 부름
    Flux<String> concertFlux =
            Flux
                .fromArray(singers)
                // emit을 1초 간격으로 방출
                .delayElements(Duration.ofSeconds(1))
                // cold sequence에서 hot sequence로 전환 -> 여러 구독자(subscriber)가 하나의 flux를 공유한다.
                // -> 다른 구독자가 먼저 구독해버리면 데이터를 emit하게 되고 이후에 구독한 구독자는 이전 데이터를 받지 못함.
                .share();
    
    // 관객 1은 이미 들어와서 기다리고 있음
    concertFlux.subscribe(
            singer -> log.info("# Subscriber1 is watching {}'s song", singer)
    );

    Thread.sleep(2500);

    // 2번째 관객은 2번째 가수가 노래한 이후에 들어옴.
    concertFlux.subscribe(
            singer -> log.info("# Subscriber2 is watching {}'s song", singer)
    );

    Thread.sleep(3000);
}

// 출력
23:09:28.529 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
23:09:29.539 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
23:09:30.543 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
23:09:30.547 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
23:09:31.564 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
23:09:31.565 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
23:09:32.570 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
23:09:32.571 [parallel-5] INFO - # Subscriber2 is watching Singer E's song

실행 결과에 스레드가 다른 것을 볼 수 있다. 이는 delayElements operator의 디폴트 스레드 스케줄러가 parallel이기 때문이다. 스케줄러는 이후에 다시 설명한다.


public class Example {
    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        // cache를 사용하여 cold -> hot으로 변경
        Mono<String> mono = getWorldTime(worldTimeUri).cache();
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
}

cache operator는 cold sequence에서 동작하는 mono를 hot sequence로 변경해주고 emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다. 결과적으로 캐시된 데이터를 전달하므로 구독이 발생할 때마다 subscriber는 동일한 데이터를 전달받게 된다.

cf) reactor에서 hot의 두가지 의미
최초 구독이 발생하기 전까지는 데이터 emit이 발생하지 않는 것과 구독 여부와 상관없이 데이터가 emit되는 것 두 가지로 구분할 수 있다. reactor에서 전자는 warm up이라고 표현하고 후자는 hot이라고 표현한다. share의 경우 최초 구독이 발생했을 때 데이터를 emit하는 warm up의 의미를 가지는 hot sequence라고 볼 수 있다.

정리

  • subscriber의 구독 시점이 달라도 구독을 할 때마다 publisher가 데이터를 처음부터 emit하는 과정을 cold sequence라고 한다.
  • cold sequence 흐름으로 동작하는 publisher를 cold publisher라고 한다.
  • publisher가 데이터를 emit하는 과정이 한 번만 일어나고, subscriber가 각각의 구독 시점 이후에 emit된 데이터만 전달받는 것을 Hot sequence라고 한다.
  • hot sequence 흐름으로 동작하는 publisher를 hot publisher라고 한다.
  • share, cache 등의 operator를 사용해서 cold sequence를 hot sequence로 변환할 수 있다.
  • hot sequence는 subscriber의 최초 구독이 발생해야 publisher가 데이터를 emit하는 warm up과 subscriber의 구독 여부와 상관 없이 데이터를 emit하는 hot으로 구분할 수 있다.

Backpressure

Backpressure는 publisher가 끊임없이 emit하는 무수히 많은 데이터를 적절하게 제어하여 데이터 처리에 과부하가 걸리지 않도록 제어하는 것이다.

subscriber의 데이터 처리 속도가 느려 publisher가 이미 N번째 데이터까지 emit했는데 subscriber는 아직도 1번째 데이터를 처리하고 있을 수 있다. 이럴 경우, emit된 데이터들은 subscriber가 data1 처리를 완료하기 전까지 대기하게 된다.

reactor의 Backpressure 처리 방식에는 여러 가지가 있다.

데이터 개수 제어

첫 번째 방식은 subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 publisher에게 요청하는 방식이 있다.

public static void main(String[] args) {
    Flux.range(1, 5)
        .doOnRequest(data -> log.info("# doOnRequest: {}", data)) // request -> 데이터를 몇개 요청하는지 확인하기 위해서 명시 -> request를 1씩 하므로 1만 계속 찍힌다., 첫번째 찍히는 것은 onSubscribe에 의해서 찍히는 것이고, 이후에 찍히는 것은 onNext에 의한 것이다.
        // subscriber가 데이터 요청 개수를 직접 제어하기 위해 subscriber 인터페이스 구현체인 baseSubscriber 사용
        .subscribe(new BaseSubscriber<Integer>() {
            // subscriber 인터페이스에 정의된 onSubscribe 메서드를 대신해 구독 시점에 request 메서드를 호출해서 최초 데이터 요청 개수를 제어하는 역할
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);
            }

            // onNext를 대신해 publisher가 emit한 데이터를 전달받아 처리한 후에 publisher에게 다시 데이터를 요청하는 역할
            // 이때 역시 request 메서드를 호출해서 데이터 요청 개수를 제어
            @SneakyThrows
            @Override
            protected void hookOnNext(Integer value) {
                Thread.sleep(2000L);
                log.info("# hookOnNext: {}", value);
                request(1);
            }
        });
}

전략 사용

두 번째 방식은 Reactor에서 제공하는 전략을 사용하는 것이다.

  • ignore 전략
    • backpressure 적용 X
  • error 전략
    • downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, exception 발생시키는 전략
  • drop 전략
    • downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 drop 시키는 전략
  • latest 전략
    • downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 데이터 중 버퍼가 비는 시점에 가장 최근에(나중에) emit된 데이터만 버퍼로 옮기고 나머지는 폐기하는 전략
  • buffer 전략
    • buffer drop latest
      • downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 가장 최근에 채워진 데이터를 폐기시키고 새로운 데이터로 채워넣는 전략
    • buffer drop oldest
      • downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 가장 오래전에 채워진 데이터를 폐기시키고 새로운 데이터로 채워넣는 전략
// error 전략
public static void main(String[] args) throws InterruptedException {
    Flux
        // 0부터 1씩 증가한 숫자를 1ms에 한번씩 emit
        .interval(Duration.ofMillis(1L))
        // error 전략, 이외에도 onBackpressureDrop, onBackpressureLatest, 
        .onBackpressureError()
        // publisher가 emit한 데이터를 확인하거나 추가적인 동작을 정의하는 용도로 사용되는데 주로 디버깅 용도로 사용된다.
        .doOnNext(data -> log.info("# doOnNext: {}", data))
        // react sequence 중 일부를 별도의 스레드에서 실행할 수 있도록 해주는 operator -> 실제로 돌려보면 2개의 스레드가 동작한다.
        .publishOn(Schedulers.parallel())
        .subscribe(data -> {
                    try {
                        // 데이터 처리에 5ms가 걸리도록 처리
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {}
                    log.info("# onNext: {}", data);
                },
                error -> log.error("# onError", error));

    Thread.sleep(2000L);
}

데이터 처리 속도가 emit 속도를 따라가지 못해서 결국 illegalStateException이 발생한다.


// buffer 전략
public static void main(String[] args) throws InterruptedException {
    Flux
        .interval(Duration.ofMillis(300L))
        .doOnNext(data -> log.info("# emitted by original Flux: {}", data))
        .onBackpressureBuffer(2,
                dropped -> log.info("** Overflow & Dropped: {} **", dropped),
                // buffer drop latest 전략, BufferOverflowStrategy.DROP_OLDEST 도 있다.
                BufferOverflowStrategy.DROP_LATEST)
        .doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data))
        .publishOn(Schedulers.parallel(), false, 1)
        .subscribe(data -> {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {}
                    log.info("# onNext: {}", data);
                },
                error -> log.error("# onError", error));

    Thread.sleep(2500L);
}

정리

  • Backpressure는 Publisher가 끊임없이 emit하는 무수히 많든 데이터를 적절하게 제어하여 데이터 처리에 있어 과부하가 걸리지 않도록 제어하는 데이터 처리 방식이다.
  • Ractor에서 지원하는 Backpressure 처리 방식에는 데이터 요청 개수를 제어하는 방식, Backpressure 전략을 사용하는 방식 등이 있다.
  • Backpressure IGNORE 전략은 backpressure을 적용하지 않는 전략이다.
  • Backpressure ERROR 전략은 Downstream의 데이터 처리 속도가 느려서 Upstream의 emit 속도를 따라가지 못할 경우 에러를 발생시키는 전략이다.
  • Backpressure DROP 전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 먼저 emit된 데이터부터 Drop하는 전략이다.
  • Backpressure LATEST 전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 갇그 찰 경우, 버퍼 밖에서 대기 중인 데이터 중에서 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략이다.
  • Backpressure BUFFER 전략은 버퍼의 데이터를 폐기하지 않고 버퍼링 하는 전략으로 버퍼가 가득 차면 버퍼 내의 데이터를 폐기하는 전략, 버퍼가 가득 차면 에러를 발생시키는 전략 등으로 구분할 수 있다.
    • Backpressure BUFFER DROP_LATEST 전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 가장 최근에(나중에) 버퍼 안에 채워진 데이터를 DROP 하는 전략이다.
    • Backpressure BUFFER DROP_OLDEST 전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 채워진 데이터 중에서 가장 가장 오래된 데이터를 Drop하는 전략이다.

Sinks

Processor는 subscriber와 publisher의 기능을 모두 지니고 있다. Reactor에서는 Processor 인터페이스를 구현한 FlxuProcessor, MonoProcessor, EmitterProcessor 등을 지원한다. 그런데 Processor의 기능을 개선한 Sinks가 reactor 3.4.0 버전부터 지원되기 시작했고 Processor 관련 api는 reactor 3.5.0부터 완전히 제거될 예정이다.

Sinks는 리액티브 스트림즈의 Signal을 프로그래밍 방식으로 푸시할 수 있는 구조이며 Flux 또는 Mono의 의미 체계를 갖는다. 지금까지 Flux, Mono가 onNext같은 Signal을 내부적으로 전송해주는 방식이었는데 Sinks를 사용하면 프로그래밍 코드를 통해 명시적으로 Signal을 전송할 수 있다.

Reactor에서 프로그래밍 방식으로 Signal을 전송하는 가장 일반적인 방법은 generate operator나 create operator 등을 사용하는 것인데 이는 sinks 지원 이전부터 사용하는 방식이다. 그럼 sinks를 사용하는 것과 operator을 사용하는 것과 차이가 무엇일까?

일반적으로 generate, create operator는 싱글스레드 기반에서 signal을 전송하는데 사용되는 반면, sinks는 멀티스레드 방식으로 signal을 전송해도 스레드 안전성을 보장하기 때문에 예기치 않은 문제를 방지해준다.

public static void main(String[] args) throws InterruptedException {
    int tasks = 6;
    // 아래에서 작업을 처리하는 단계, 처리 결과를 가공하는 단계, 가공된 결과를 subscriber에게 전달하는 단계 모두 별도의 스레드에서 실행되도록 구성되었다. -> 총 3개의 스레드가 동시 실행
    Flux
        // create operator가 처리해야할 작업의 개수만큼 doTask 메서드를 호출해서 결과를 리턴받는다.
        .create((FluxSink<String> sink) -> {
            IntStream
                    .range(1, tasks)
                    .forEach(n -> sink.next(doTask(n)));
        })
        // 작업을 처리하는 단계 스레드 지정
        .subscribeOn(Schedulers.boundedElastic())        
        .doOnNext(n -> log.info("# create(): {}", n))

        // 처리 결과를 가공하는 스레드
        .publishOn(Schedulers.parallel())
        // 처리 결과를 가공한 뒤 최종적으로 subscriber에게 전달
        .map(result -> result + " success!")
        .doOnNext(n -> log.info("# map(): {}", n))
        
        // 처리 결과를 subscriber에 전달하는 스레드
        .publishOn(Schedulers.parallel())
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}

private static String doTask(int taskNumber) {
    // now tasking.
    // complete to task.
    return "task " + taskNumber + " result";
}

// 출력
01:33:11.642 [main] DEBUG- Using Slf4j logging framework
01:33:11.759 [boundedElastic-1] INFO - # create(): task 1 result
01:33:11.766 [boundedElastic-1] INFO - # create(): task 2 result
01:33:11.767 [boundedElastic-1] INFO - # create(): task 3 result
01:33:11.768 [boundedElastic-1] INFO - # create(): task 4 result
01:33:11.768 [boundedElastic-1] INFO - # create(): task 5 result
01:33:11.781 [parallel-2] INFO - # map(): task 1 result success!
01:33:11.782 [parallel-2] INFO - # map(): task 2 result success!
01:33:11.782 [parallel-1] INFO - # onNext: task 1 result success!
01:33:11.782 [parallel-2] INFO - # map(): task 3 result success!
01:33:11.782 [parallel-1] INFO - # onNext: task 2 result success!
01:33:11.782 [parallel-2] INFO - # map(): task 4 result success!
01:33:11.782 [parallel-1] INFO - # onNext: task 3 result success!
01:33:11.782 [parallel-1] INFO - # onNext: task 4 result success!
01:33:11.782 [parallel-2] INFO - # map(): task 5 result success!
01:33:11.782 [parallel-1] INFO - # onNext: task 5 result success!

이처럼 create operator를 사용해서 프로그래밍 방식으로 signal을 전송할 수 있으며, reactor sequence를 단계적으로 나눠서 여러 개의 스레드로 처리할 수도 있다.

그런데 위 코드에서 작업을 처리한 후, 그 결과 값을 반환하는 doTask 메서드가 싱글스레드가 아닌 여러 개의 스레드에서 각각의 전혀 다른 작업들을 처리한 후, 처리 결과를 반환하는 상황이 발생할 수도 있다. 이 상황에서는 sinks를 사용한다.

public static void main(String[] args) throws InterruptedException {
    int tasks = 6;

    Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
    Flux<String> fluxView = unicastSink.asFlux();
    IntStream
            .range(1, tasks)
            .forEach(n -> {
                try {
                    // doTask 메서드가 루프를 돌 때마다 새로운 스레드에서 실행
                    // doTask 메서드의 처리 결과를 sinks를 통해 downstream으로 emit
                    new Thread(() -> {
                        unicastSink.emitNext(doTask(n), Sinks.EmitFailureHandler.FAIL_FAST);
                        log.info("# emitted: {}", n);
                    }).start();
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                }
            });

    fluxView
            .publishOn(Schedulers.parallel())
            .map(result -> result + " success!")
            .doOnNext(n -> log.info("# map(): {}", n))
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(200L);
}

private static String doTask(int taskNumber) {
    // now tasking.
    // complete to task.
    return "task " + taskNumber + " result";
}

이전 create과 다르게 여기서는 5+1+1으로 총 7개의 스레드가 사용된다. sinks는 프로그래밍 방식으로 signal을 전송할 수 있으며, 멀티스레드 환경에서 스레드 안전성을 보장받을 수 있다는 정도로 알아두자.

Processor의 경우에는 onNtext, onComplete, onError 메서드를 직접적으로 호출함으로써 스레드 안전성이 보장되지 않을 수 있지만, sinks는 동시접근을 감지하고 동시 접근 스레드 중 하나가 빠르게 실패함으로써 스레드 안전성을 보장한다.

Sinks 종류 및 특징

Sinks에서 signal을 전송할 수 있는 방법은 크게 두 가지다.

  • sinks.one
  • sinks.many

sinks.one

public static <T> Sinks.One<T> one() {
    return SinksSpecs.DEFAULT_SINKS.one();
}

sinks.one은 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세이다. 한 건의 데이터를 프로그래밍 방식으로 emit하는 역할을 하기도 하고, Mono 방식으로 subscriber가 데이터를 소비할 수 있도록 해주는 sinks 클래스 내부에 인터페이스로 정의된 sinks의 스펙 또는 사양으로 볼 수 있다.

즉, sinks.one 메서드를 호출하는 것은 한 건의 데이터를 프로그래밍 방식으로 emit하는 기능을 사용하고 싶으니 거기에 맞는 적당한 기능 명세를 달라고 요청하는 것과 같다.

public static void main(String[] args) throws InterruptedException {
    Sinks.One<String> sinkOne = Sinks.one();
    // emit한 데이터를 구독하여 전달받기 위해 asMono를 사용해서 Mono 객체로 변환
    Mono<String> mono = sinkOne.asMono();
    
    // 두번째 파라미터는 emit 도중에 에러가 발생할 경우 어떻게 처리할 것인지에 대한 핸들러이다.
    sinkOne.emitValue("Hello Reactor", FAIL_FAST);
    sinkOne.emitValue("Hi Reactor", FAIL_FAST);
    sinkOne.emitValue(null, FAIL_FAST);

    
    mono.subscribe(data -> log.info("# Subscriber1 {}", data));
    mono.subscribe(data -> log.info("# Subscriber2 {}", data));
}

EmitFailureHandler FAIL_FAST = (signalType, emission) -> false;

// 출력 결과
12:44:55.039 [main] DEBUG- onNextDropped: Hi Reactor
12:44:55.042 [main] INFO - # Subscriber1 Hello Reactor
12:44:55.043 [main] INFO - # Subscriber2 Hello Reactor

FAIL_FAST는 상수같지만 실제로 보면 람다 표현식으로 표현한 EmitFailureHandler 인터페이스의 구현 객체이다. 이 EmitFailureHandler 객체를 통해서 emit 도중 발생한 에러에 대해 빠르게 실패 처리합니다. 빠르게 실패 처리한다는 것은 에러가 발생했을 때 재시도를 하지 않고 즉시 실패 처리한다는 것이다.

sinkOne은 한 건의 데이터만 처리하기 때문에 처음 emit한 데이터는 정상적으로 emit되지만 나머지 데이터들은 drop된다.

Sinks.Many

public static ManySpec many() {
    return SinksSpecs.DEFAULT_SINKS.many();
}

Sinks.Many 메서드를 사용해서 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해둔 명세이다. sinks.one 메서드는 sinks.one을 리턴하는 반면에, sinks.many 메서드의 경우 sinks.many를 리턴하지 않고 manySpec이라는 인터페이스를 리턴한다.

sinks.one의 경우, 단순히 한 건의 데이터를 emit하는 한 가지 기능만 가지기 때문에 별도의 spec이 정의되는 게 아니라 Default Spec(SinksSpecs.DEFAULT_ROOT_SPEC)을 사용한다. 반면에 Sinks.Many의 경우, 데이터 emit을 위한 여러 가지 기능이 정의된 ManySpec을 리턴한다.

public final class Sinks {
    // ...

	public interface ManySpec {
		UnicastSpec unicast();
		MulticastSpec multicast();
		MulticastReplaySpec replay();
	}
}

ManySpec은 총 세 가지 기능을 정의하는데, 이 세 기능은 각각의 기능을 또다시 별도의 Spec(UnicastSpec, MulticastSpec, MutlicastReplaySpec) 정의해두고 있다.

cf) unicast, broadcast, multicast
네트워크 통신에서 broadcast는 네트워크에 연결된 모든 시스템이 정보를 전달받는 (ont to all)방식이다. 반면에 unicast는 하나의 특정 시스템만 정보를 전달받는(one to one)방식이고 multicast는 일부 시스템들만 정보를 전달받는(one to many)방식이다. 따라서 unicast의 의미를 가지는 unicastSpec의 기능은 단 하나의 subscriber에게만 데이터를 emit하는 것이다.

public static void main(String[] args) throws InterruptedException {
    // unicast 메서드를 호출하고 리턴 값으로 UnicastSpec을 리턴하고 최종적으로 UnicastSpec에 정의된 기능을 사용한다.
    // UnicastSpec에 정의된 기능은 onBackpressureBuffer 메서드를 호출함으로써 사용하게 된다.
    Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();

    // asFlux로 flux로 변환
    Flux<Integer> fluxView = unicastSink.asFlux();

    unicastSink.emitNext(1, FAIL_FAST);
    unicastSink.emitNext(2, FAIL_FAST);


    fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

    unicastSink.emitNext(3, FAIL_FAST);

    // unicast는 단 하나의 subscriber에게만 전달하기 때문에 하나의 구독자가 더 생기면 IllegalStateException이 발생한다.
    // fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}

// 출력
13:00:41.644 [main] INFO - # Subscriber1: 1
13:00:41.646 [main] INFO - # Subscriber1: 2
13:00:41.646 [main] INFO - # Subscriber1: 3
public static void main(String[] args) {
    // multicast 방식
    Sinks.Many<Integer> multicastSink =
            Sinks.many().multicast().onBackpressureBuffer();
    Flux<Integer> fluxView = multicastSink.asFlux();

    multicastSink.emitNext(1, FAIL_FAST);
    multicastSink.emitNext(2, FAIL_FAST);

    fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
    fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));

    multicastSink.emitNext(3, FAIL_FAST);
}

// 출력
13:01:39.432 [main] INFO - # Subscriber1: 1
13:01:39.434 [main] INFO - # Subscriber1: 2
13:01:39.436 [main] INFO - # Subscriber1: 3
13:01:39.436 [main] INFO - # Subscriber2: 3

multicast는 하나 이상의 subscriber에게 데이터를 emit한다. 2번째 구독자는 3번 데이터만 전달 받았는데, sinks가 publisher의 역할을 할 경우에는 기본적으로 hot publisher로 동작하며, 특히 onBackpressureBuffer 메서드는 warm up의 특징을 가지는 hot sequence로 동작하기 때문이다.

public static void main(String[] args) {
    // replay 사용 -> MulticastReplaySpec을 리턴하고 구현체 메서드중 하나인 limit메서드 호출
    Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
    Flux<Integer> fluxView = replaySink.asFlux();

    replaySink.emitNext(1, FAIL_FAST);
    replaySink.emitNext(2, FAIL_FAST);
    replaySink.emitNext(3, FAIL_FAST);

    fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));

    replaySink.emitNext(4, FAIL_FAST);

    fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
// 출력
13:04:33.769 [main] INFO - # Subscriber1: 2
13:04:33.771 [main] INFO - # Subscriber1: 3
13:04:33.771 [main] INFO - # Subscriber1: 4
13:04:33.772 [main] INFO - # Subscriber2: 3
13:04:33.772 [main] INFO - # Subscriber2: 4

mp3에서 replay버튼을 누르거나 replay모드로 설정하면 현재 듣고 있는 음악 또는 현재 재생 목록이 처음부터 다시 재생되는 것처럼 sinks의 replay도 같은 기능을 한다.

multicastReplaySpec에는 emit된 데이터를 다시 replay해서 구독 전에 이미 emit된 데이터라도 subscriber가 전달받을 수 있게 하는 다양한 메서드들이 정의되어 있다.

대표적으로 all 메서드가 있는데 이는 구독 전에 이미 emit된 데이터가 있더라도 처음 emit된 데이터부터 모든 데이터들이 subscriber에게 전달된다. limit은 emit된 데이터 중에서 파라미터로 입력한 개수만큼 가장 나중에 emit된 데이터부터 subscriber에게 전달하는 기능을 제공한다.

정리

  • sinks는 publisher와 subscriber의 기능을 모두 지닌 processor의 향상된 기능을 제공한다.
  • 데이터를 emit하는 sinks에는 크게 Sinks.One과 Sinks.Many가 있다.
  • Sinks.One은 한 건의 데이터를 프로그래밍 방식으로 emit한다.
  • Sinks.Many는 여러 건의 데이터를 프로그래밍 방식으로 emit한다.
  • Sinks.Many의 UnicastSpec은 단 하나의 Subscriber에게만 데이터를 emit한다.
  • Sinks.Many의 MultiCastSpec은 하나 이상의 Subscriber에게 데이터를 emit한다.
  • Sinks.Many의 MulticastReplaySpec은 emit된 데이터 중에서 특정 시점으로 되돌린 (replay) 데이터부터 emit한다.

scheduler

Reactor에서 사용되는 Scheduler는 Reactor Sequence에서 사용되는 스레드를 관리해주는 관리자 역할을 한다. 다시 말하면, scheduler는 어떤 스레드에서 무엇을 처리할지를 제어한다.

scheduler를 위한 operator

  • publishOn()
  • subscribeOn()

두 operator의 파라미터로 적절한 scheduler를 전달하면 해당 scheduler의 특성에 맞는 스레드가 reactor sequence에 할당된다. 일반적으로 ractor에서 스레드를 관리하기 위해 subscribeOn()과 publishOn()을 가장 많이 사용하지만, 그 외에 parallel() 이라는 operator도 있다.

subscribeOn

이름처럼 구독이 발생한 직후 실행될 스레드를 지정하는 operator다. 즉, publisher가 데이터를 emit하기 전에 실행 스레드를 변경한다.

public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[] {1, 3, 5, 7})
            // 구독이 발생한 직후에 원본 publisher의 동작 처리를 위한 스레드를 할당한다.
            .subscribeOn(Schedulers.boundedElastic())
            // emit 되는 데이터 출력
            .doOnNext(data -> log.info("# doOnNext: {}", data))
            // 구독이 발생한 시점에 추가적인 어떤 처리가 필요한 경우 해당 처리 동작을 수행한다. -> 구독이 발생한 시점 실행되는 스레드 출력
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
            // 구독
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}
// 출력
21:20:19.099 [main] INFO - # doOnSubscribe
21:20:19.117 [boundedElastic-1] INFO - # doOnNext: 1
21:20:19.122 [boundedElastic-1] INFO - # onNext: 1
21:20:19.122 [boundedElastic-1] INFO - # doOnNext: 3
21:20:19.122 [boundedElastic-1] INFO - # onNext: 3
21:20:19.122 [boundedElastic-1] INFO - # doOnNext: 5
21:20:19.122 [boundedElastic-1] INFO - # onNext: 5
21:20:19.123 [boundedElastic-1] INFO - # doOnNext: 7
21:20:19.123 [boundedElastic-1] INFO - # onNext: 7

doOnSubscribe에서 동작은 main 스레드에서 실행되는데 예제 코드의 최초 실행 스레드가 main스레드이기 때문이다. subscribeOn에서 scheduler를 지정했기 때문에 구독이 발생한 직후부터는 원본 Flux의 동작을 처리하는 스레드가 바뀌게 된다. 따라서 doOnNext는 boundedElastic-1 스레드에서 실행되고 operator 체인 상에서 특별히 다른 scheduler를 지정하지 않았기 때문에 subscriber에서 데이터를 처리하는 동작 역시 같은 스레드에서 진행된다.

publisherOn

publisherOn는 코드상에서 publishOn을 기준으로 아래쪽인 downstream의 실행 스레드를 변경한다. subscribeOn과 마찬가지로 파라미터로 scheduler를 지정함으로써 해당 scheduler의 특성을 가진 스레드로 변경할 수 있다.

public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[] {1, 3, 5, 7})
            .doOnNext(data -> log.info("# doOnNext: {}", data))
            .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
            // downstream으로 데이터를 emit하는 스레드를 변경한다.
            .publishOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}

// 출력
21:28:58.319 [main] INFO - # doOnSubscribe
21:28:58.326 [main] INFO - # doOnNext: 1
21:28:58.330 [main] INFO - # doOnNext: 3
21:28:58.331 [main] INFO - # doOnNext: 5
21:28:58.331 [main] INFO - # doOnNext: 7
21:28:58.330 [parallel-1] INFO - # onNext: 1
21:28:58.332 [parallel-1] INFO - # onNext: 3
21:28:58.332 [parallel-1] INFO - # onNext: 5
21:28:58.332 [parallel-1] INFO - # onNext: 7

구독이 되면 최초에는 doOnSubscribe가 처리를 진행하고 emit된 데이터를 doOnNext -> subscribe 순으로 진행된다. doOnNext의 경우 subscribeOn operator가 사용되지 않았기 때문에 main 스레드가 그대로 사용된다. 하지만 onNext의 경우 publishOn이 추가되었기 때문에 publishOn을 기준으로 downstream의 실행 스레드가 변경된다.

parallel

subscribeOn, publishOn operator의 경우 동시성(동시에 수행하는 것처럼 보임)을 가지는 논리적인 스레드에 해당되지만, parallel operator는 병렬성(실제로 동시에 처리)을 가지는 물리적인 스레드에 해당한다.

parallel의 경우 라운드로빈 방식으로 cpu 코어 개수만큼의 스레드를 병렬로 실행한다. 여기서 말하는 cpu 코어 개수는 물리적인 코어 개수가 아니라 논리적인 코어 개수(듀얼 코어 4스레드 = 2개 코어가 있고 각 코어는 2개의 스레드를 갖는다 -> 논리적으로는 4개의 코어)를 의미한다.

public static void main(String[] args) throws InterruptedException {
    Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
            // emit되는 데이터를 cpu의 논리적인 코어(물리적인 스레드) 수에 맞게 사전에 골고루 분배하는 역할
            // 물리적 스레드를 전부 사용할 필요가 없는 경우 인자로 사용할 스레드 개수를 지정한다.
            .parallel(4)
            // 실제로 병렬 작업을 수행할 스레드 할당
            .runOn(Schedulers.parallel())
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(100L);
}

동작 이해

// main
public static void main(String[] args) {
    Flux
        .fromArray(new Integer[] {1, 3, 5, 7})
        .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
        .filter(data -> data > 3)
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        .map(data -> data * 10)
        .doOnNext(data -> log.info("# doOnNext map: {}", data))
        .subscribe(data -> log.info("# onNext: {}", data));
}

scheduler operator를 사용하지 않았으므로 모두 main에서 수행된다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .fromArray(new Integer[]{1, 3, 5, 7})
        // main 스레드
        .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
        // main 스레드
        .filter(data -> data > 3)
        // main 스레드
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        // 스레드 변화
        .publishOn(Schedulers.parallel())
        // parallel 스레드
        .map(data -> data * 10)
        // parallel 스레드
        .doOnNext(data -> log.info("# doOnNext map: {}", data))
        // parallel 스레드
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}

// 출력
21:42:28.014 [main] INFO - # doOnNext fromArray: 1
21:42:28.021 [main] INFO - # doOnNext fromArray: 3
21:42:28.021 [main] INFO - # doOnNext fromArray: 5
21:42:28.021 [main] INFO - # doOnNext filter: 5
21:42:28.022 [main] INFO - # doOnNext fromArray: 7
21:42:28.022 [main] INFO - # doOnNext filter: 7
21:42:28.022 [parallel-1] INFO - # doOnNext map: 50
21:42:28.023 [parallel-1] INFO - # onNext: 50
21:42:28.024 [parallel-1] INFO - # doOnNext map: 70
21:42:28.024 [parallel-1] INFO - # onNext: 70

즉, publishOn 이후에 추가된 operator 체인은 모두 parallel 스레드에서 수행된 것을 확인할 수 있다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .fromArray(new Integer[] {1, 3, 5, 7})
        // main
        .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
        // 스레드 변화
        .publishOn(Schedulers.parallel())
        // parallel-2
        .filter(data -> data > 3)
        // parallel-2
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        // 스레드 변화
        .publishOn(Schedulers.parallel())
        // parallel-1
        .map(data -> data * 10)
        // parallel-1
        .doOnNext(data -> log.info("# doOnNext map: {}", data))
        // parallel-1
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}

// 출력
21:44:25.581 [main] INFO - # doOnNext fromArray: 1
21:44:25.589 [main] INFO - # doOnNext fromArray: 3
21:44:25.590 [main] INFO - # doOnNext fromArray: 5
21:44:25.591 [main] INFO - # doOnNext fromArray: 7
21:44:25.591 [parallel-2] INFO - # doOnNext filter: 5
21:44:25.593 [parallel-2] INFO - # doOnNext filter: 7
21:44:25.593 [parallel-1] INFO - # doOnNext map: 50
21:44:25.593 [parallel-1] INFO - # onNext: 50
21:44:25.594 [parallel-1] INFO - # doOnNext map: 70
21:44:25.594 [parallel-1] INFO - # onNext: 70

publishOn operator를 체인에서 여러번 사용할 수 있다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .fromArray(new Integer[] {1, 3, 5, 7})
        // 스레드 변화
        .subscribeOn(Schedulers.boundedElastic())
        // boundedElastic-1
        .doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
        // boundedElastic-1
        .filter(data -> data > 3)
        // boundedElastic-1
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        // 스레드 변화
        .publishOn(Schedulers.parallel())
        // parallel-1
        .map(data -> data * 10)
        // parallel-1
        .doOnNext(data -> log.info("# doOnNext map: {}", data))
        // parallel-1
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(500L);
}

// 출력
21:46:39.632 [boundedElastic-1] INFO - # doOnNext fromArray: 1
21:46:39.640 [boundedElastic-1] INFO - # doOnNext fromArray: 3
21:46:39.641 [boundedElastic-1] INFO - # doOnNext fromArray: 5
21:46:39.641 [boundedElastic-1] INFO - # doOnNext filter: 5
21:46:39.642 [boundedElastic-1] INFO - # doOnNext fromArray: 7
21:46:39.642 [parallel-1] INFO - # doOnNext map: 50
21:46:39.643 [boundedElastic-1] INFO - # doOnNext filter: 7
21:46:39.643 [parallel-1] INFO - # onNext: 50
21:46:39.643 [parallel-1] INFO - # doOnNext map: 70
21:46:39.643 [parallel-1] INFO - # onNext: 70

subscribeOn은 체인의 어느 위치에 사용되던 간에 구독 시점 직후, 즉 publisher가 데이터를 emit하기 전에 실행 스레드를 변경한다.

scheduler 종류

Schedulers.immediate

별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업을 처리하고자 할때 사용한다.

public static void main(String[] args) throws InterruptedException {
    Flux
        .fromArray(new Integer[] {1, 3, 5, 7})
        // 스레드 변경
        .publishOn(Schedulers.parallel())
        // parallel-1
        .filter(data -> data > 3)
        // parallel-1
        .doOnNext(data -> log.info("# doOnNext filter: {}", data))
        // 스레드 유지
        .publishOn(Schedulers.immediate())
        // parallel-1
        .map(data -> data * 10)
        // parallel-1
        .doOnNext(data -> log.info("# doOnNext map: {}", data))
        // parallel-1
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(200L);
}

// 출력
21:53:00.001 [main] DEBUG- Using Slf4j logging framework
21:53:00.036 [parallel-1] INFO - # doOnNext filter: 5
21:53:00.053 [parallel-1] INFO - # doOnNext map: 50
21:53:00.054 [parallel-1] INFO - # onNext: 50
21:53:00.054 [parallel-1] INFO - # doOnNext filter: 7
21:53:00.054 [parallel-1] INFO - # doOnNext map: 70
21:53:00.054 [parallel-1] INFO - # onNext: 70

현재 스레드에서 작업을 처리하고 싶다면 굳이 publishOn operator를 추가할 필요가 있을까 라고 생각할 수 있다. 예를 들어 위 코드가 어떤 API들 중에서 공통의 역할을 하는 API이고, 해당 API의 파라미터로 scheduler를 전달할 수 있다고 가정해보자. 이 경우 API를 사용하는 입장에서 map 이후의 operator 체인 작업은 원래 실행되던 스레드에서 실행하게 하고 싶을때도 있을 것이다. 즉, scheduler가 필요한 API가 있긴 한데 별도의 스레드를 추가 할당하고 싶지 않을 경우 사용한다.

schedulers.single

스레드 하나만 생성해서 scheduler가 제거되기 전까지 재사용한다.

public static void main(String[] args) throws InterruptedException {
    doTask("task1")
            .subscribe(data -> log.info("# onNext: {}", data));

    doTask("task2")
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(200L);
}

private static Flux<Integer> doTask(String taskName) {
    return Flux.fromArray(new Integer[] {1, 3, 5, 7})
            .publishOn(Schedulers.single())
            .filter(data -> data > 3)
            .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
            .map(data -> data * 10)
            .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}

// 출력
21:57:28.084 [single-1] INFO - # task1 doOnNext filter: 5
21:57:28.098 [single-1] INFO - # task1 doOnNext map: 50
21:57:28.098 [single-1] INFO - # onNext: 50
21:57:28.098 [single-1] INFO - # task1 doOnNext filter: 7
21:57:28.099 [single-1] INFO - # task1 doOnNext map: 70
21:57:28.099 [single-1] INFO - # onNext: 70
21:57:28.103 [single-1] INFO - # task2 doOnNext filter: 5
21:57:28.103 [single-1] INFO - # task2 doOnNext map: 50
21:57:28.103 [single-1] INFO - # onNext: 50
21:57:28.103 [single-1] INFO - # task2 doOnNext filter: 7
21:57:28.104 [single-1] INFO - # task2 doOnNext map: 70
21:57:28.104 [single-1] INFO - # onNext: 70

doTask를 각자 호출했지만 single을 사용했기 때문에 첫번째 호출에서 이미 생성된 스레드를 재사용한다. 하나의 스레드를 재사용하면서 다수의 작업을 처리할 수 있는데, 하나의 스레드로 다수의 작업을 처리해야 되므로 지연 시간이 짧은 작업을 처리하는 것이 효과적이다.

Schedulers.newSingle()

single은 하나를 재사용하는 반면에, 이는 매번 새로운 스레드 하나를 생성한다.

public static void main(String[] args) throws InterruptedException {
    doTask("task1")
            .subscribe(data -> log.info("# onNext: {}", data));

    doTask("task2")
            .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(200L);
}

private static Flux<Integer> doTask(String taskName) {
    return Flux.fromArray(new Integer[] {1, 3, 5, 7})
            .publishOn(Schedulers.newSingle("new-single", true))
            .filter(data -> data > 3)
            .doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
            .map(data -> data * 10)
            .doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
}

// 출력
21:59:43.950 [new-single-1] INFO - # task1 doOnNext filter: 5
21:59:43.950 [new-single-2] INFO - # task2 doOnNext filter: 5
21:59:43.976 [new-single-1] INFO - # task1 doOnNext map: 50
21:59:43.976 [new-single-2] INFO - # task2 doOnNext map: 50
21:59:43.976 [new-single-1] INFO - # onNext: 50
21:59:43.976 [new-single-2] INFO - # onNext: 50
21:59:43.976 [new-single-1] INFO - # task1 doOnNext filter: 7
21:59:43.976 [new-single-1] INFO - # task1 doOnNext map: 70
21:59:43.976 [new-single-2] INFO - # task2 doOnNext filter: 7
21:59:43.978 [new-single-1] INFO - # onNext: 70
21:59:43.978 [new-single-2] INFO - # task2 doOnNext map: 70
21:59:43.978 [new-single-2] INFO - # onNext: 70

첫번째 메서드 수행 스레드와 두번째 수행 스레드가 다른 것을 확인할 수 있다. 그리고 newSingle의 두번째 파라미터가 존재하는데 이 스레드를 데몬 스레드로 동작하게 할지 여부를 설정한다.

데몬 스레드는 보조 스레드라고 불리는데, 주 스레드가 종료되면 자동으로 종료되는 특징이 있다. 즉, 여기서는 true값을 넣어주면 main스레드가 종료되면 자동으로 죵료된다.

schedulers.boundedElastic

executorService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용하는 방식이다.

기본적으로 cpu 코어 수 * 10만큼의 스레드를 생성하여, 풀에 있는 모든 스레드가 작업중이라면 이용 가능한 스레드가 생길 때까지 최대 10만개의 작업이 큐에서 대기할 수 있다.

예제 코드에서는 fromArray 같은 operator로 적은 데이터 소스를 수동으로 전달했다. 하지만 실제로는 http 요청 같은 blocking i/o 작업을 통해 전달받은 데이터를 데이터 소스로 사용하는 경우가 많다. boundedElastic 스케줄러가 이러한 blocking i/o 방식에 적합하다. 실행 시간이 긴 blocking i/o 작업이 포함된 경우, 다른 non-blocking 처리에 영향을 주지 않도록 전용 스레드를 할당해서 blocking i/o 작업을 처리하기 때문에 처리 시간을 효율적으로 사용할 수 있다.

schedulers.parallel

boundedElastic이 blocking i/o 작업에 최적화되어 있는 반면에, parallel은 non-blocking i/o에 최적화되어 있는 scheduler로 cpu 코어 수만큼의 스레드를 생성한다.

schedulers.fromExecutorService

기존에 이미 사용하고 있는 executorService가 있다면 해당 executorService로부터 scheduler를 생성하는 방식이다. ExecutorService로부터 직접 생성할 수도 있지만 이 방식은 권장되지 않는다.

schedulers.newXXX()

single, boundedElastic, parallel은 reactor에서 제공하는 디폴트 scheduler 인스턴스를 사용한다. 하지만 필요하다면 앞에 new를 붙인 메서드를 사용해서 새로운 scheduler 인스턴스를 생성할 수 있다. 즉, 스레드 이름, 생성 가능한 디폴트 스레드 개수, 스레드 유휴 시간, 데몬 스레드로의 도작 여부 등을 직접 지정해서 커스텀 스레드 풀을 새로 생성할 수 있다.

정리

  • schedulers.immediate : 별도의 스레드를 추가적으로 생성하지 않고 현재 스레드에서 작업
  • schedulers.single : 스레드 하나만 생성해서 scheduler가 제거되기 전까지 재사용
  • schedulers.boundedElastic : ExecutorService 기반의 스레드 풀을 생성한 후, 그 안에서 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업이 종료된 스레드는 반납하여 재사용한다. blocking i/o 작업에 최적화되어 있다.
  • schedulers.parallel : Non-block i/o 작업에 최적화되어 있고, cpu 코어 수만큼 스레드를 생성한다.
  • new를 붙이면 새로운 스케줄러 인스턴스를 생성할 수 있다.

Context

context는 어떠한 상황에서 그 상황을 처리하기 위해 필요한 정보이다.

몇가지 예를 들어보자.

  • servlet context는 servlet이 servlet container와 통신하기 위해서 필요한 정보를 제공하는 인터페이스다.
  • applicationcontext는 애플리케이션의 정보를 제공하는 인터페이스다. applicationContext가 제공하는 대표적 정보가 bean 정보다.
  • spring security에서 securityContextHolder는 securityContext를 관리하는 주체인데, securiyContext는 애플리케이션의 사용자 인증 정보를 제공하는 인터페이스

reactor의 context는 operator 같은 reactor 구성요소 간에 전파되는 key/value 형태의 저장소이다. 여기서 전파는 downstream에서 upstream으로 context가 전파되어 operator 체인상의 각 operator가 해당 context의 정보를 동일하게 이용할 수 있음을 의미한다.

reacor의 context는 threadLocal과 유사한 면이 있짐난 각각의 실행 스레드와 매핑되는 ThreadLocal과 달리 실행 스레드와 매핑되는 것이 아니라 subscriber와 매핑된다. 즉, 구독이 발생할 때마다 해당 구독과 연결된 하나의 Context가 생성된다고 보면 된다.

public static void main(String[] args) throws InterruptedException {
    Mono
        // 원본 소스 레벨 데이터 읽기
        // 파라미터는 context가 아니라 contextView 타입 객체
        // context에서 데이터를 읽을 때는 contextView 타입을 사용하고 쓸때는 Context 타입을 사용한다.
        .deferContextual(ctx ->
            Mono
                .just("Hello" + " " + ctx.get("firstName"))
                .doOnNext(data -> log.info("# just doOnNext : {}", data))
        )
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.parallel())
        // 체인 중간에서 읽기 위해서는 아래 operator를 사용한다.
        .transformDeferredContextual(
                (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
        )
        // context에 데이터 쓰기
        // 람다식으로 파라미터는 context이고 context.put을 사용해서 데이터를 쓸 수 있다.
        .contextWrite(context -> context.put("lastName", "Jobs"))
        .contextWrite(context -> context.put("firstName", "Steve"))
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(100L);
}

// 출력
22:22:06.593 [boundedElastic-1] INFO - # just doOnNext : Hello Steve
22:22:06.620 [parallel-1] INFO - # onNext: Hello Steve Jobs

데이터를 읽는 방식은 크게 두 가지가 있다. 하나는 원본 데이터 소스 레벨에서 읽는 방식이고 다른 하나는 operator 체인의 중간에서 읽는 방식이다.

operator 체인상의 서로 다른 스레드들이 context의 저장된 데이터를 손쉽게 가져올 수 있는 것이 보인다. 그리고 context put을 통해 데이터를 쓴 후 매번 불변 객체를 다음 contextWrite operator로 전달함으로 써 스레드 안전성을 보장한다.

context 관련 api

  • put(key,value)
    • key/value 형태로 context에 쓰기
  • of(key1, value1, key2, value2)
    • key/value 형태로 context에 여러 개 값을 쓴다.
    • 최대 5개의 파라미터를 입력할 수 있고 그 이상은 putAll 사용
  • putAll(contextView)
    • 현재 Context와 파라미터로 입력된 ContextView를 merge한다.
  • delete(key)
    • Context에서 key에 해당하는 value를 삭제한다.
public static void main(String[] args) throws InterruptedException {
    final String key1 = "company";
    final String key2 = "firstName";
    final String key3 = "lastName";

    Mono
        .deferContextual(ctx ->
                Mono.just(ctx.get(key1) + ", " + ctx.get(key2) + " " + ctx.get(key3))
        )
        .publishOn(Schedulers.parallel())
        .contextWrite(context ->
                // of의 반환 타입은 Context인데 putAll의 파라미터 타입은 contextView다. readOnly api가 Context 타입을 읽기만 가능한 ContextView 타입으로 변환해준다.
                context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly())
        )
        .contextWrite(context -> context.put(key1, "Apple"))
        .subscribe(data -> log.info("# onNext: {}" , data));

    Thread.sleep(100L);
}

  • get(key)
    • contextView에서 key에 해당하는 value 반환
  • getOrEmpty(key)
    • ContextView에서 key에 해당하는 value를 optional로 래핑해서 반환
  • getOrDefault(key, default value)
    • ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 가져온다.
  • hasKey(key)
    • ContextView에서 특정 key가 존재하는지 확인
  • isEmpty()
    • Context가 비어있는지 확인
  • size()
    • Context 내에 있는 key/value 개수 반환
public static void main(String[] args) throws InterruptedException {
    final String key1 = "company";
    final String key2 = "firstName";
    final String key3 = "lastName";

    Mono
        .deferContextual(ctx ->
                Mono.just(ctx.get(key1) + ", " +
                        // optional을 반환하기 때문에 orElse를 사용할 수 있지만 이렇게 하는 것보단 getOrDefault 사용이 낫다.
                        ctx.getOrEmpty(key2).orElse("no firstName") + " " +
                        ctx.getOrDefault(key3, "no lastName"))
        )
        .publishOn(Schedulers.parallel())
        .contextWrite(context -> context.put(key1, "Apple"))
        .subscribe(data -> log.info("# onNext: {}" , data));

    Thread.sleep(100L);
}

Context 특징

context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.

public static void main(String[] args) throws InterruptedException {
    final String key1 = "company";

    Mono<String> mono = Mono.deferContextual(ctx ->
                    Mono.just("Company: " + " " + ctx.get(key1))
            )
            .publishOn(Schedulers.parallel());


    mono.contextWrite(context -> context.put(key1, "Apple"))
            .subscribe(data -> log.info("# subscribe1 onNext: {}", data));

    mono.contextWrite(context -> context.put(key1, "Microsoft"))
            .subscribe(data -> log.info("# subscribe2 onNext: {}", data));

    Thread.sleep(100L);
}

// 출력
22:32:10.440 [parallel-2] INFO - # subscribe2 onNext: Company:  Microsoft
22:32:10.440 [parallel-1] INFO - # subscribe1 onNext: Company:  Apple

  • context는 operator 체인의 아래서 위로 전파된다.
    • 따라서 일반적으로 모든 operator에서 context에 저장된 데이터를 읽을 수 있도록 contextWrite을 operator 체인 가장 마지막에 작성한다.
  • 동일 키에 대한 값을 중복해서 저장하면 operator 체인에서 가장 위쪽에 위치한 contextWrite이 저장한 값으로 덮어씌워진다.
public static void main(String[] args) throws InterruptedException {
    String key1 = "company";
    String key2 = "name";

    Mono
        .deferContextual(ctx ->
            Mono.just(ctx.get(key1))
        )
        .publishOn(Schedulers.parallel())
        // key2, bill 값은 아래로 전파되지 않는다.
        .contextWrite(context -> context.put(key2, "Bill"))
        // 이 시점에 key2에 대한 값은 존재하지 않는다.
        .transformDeferredContextual((mono, ctx) ->
                mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
        )
        // apple이 apple2를 덮어씌운다.
        .contextWrite(context -> context.put(key1, "Apple"))
        .contextWrite(context -> context.put(key1, "Apple2"))
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(100L);
}

  • inner sequence 내부에서는 외부 context에 저장된 데이터를 읽을 수 있다.
  • inner sequence 외부에서는 inner sequence 내부 context에 저장된 데이터를 읽을 수 없다.
public static void main(String[] args) throws InterruptedException {
    String key1 = "company";
    Mono
        .just("Steve")
        // role은 내부 context에서 put되므로 접근 불가능
        // .transformDeferredContextual((stringMono, ctx) ->
        //         ctx.get("role"))
        .flatMap(name ->            
            // inner context
            Mono.deferContextual(ctx ->
                Mono
                    // outter context의 값을 그대로 사용 가능
                    .just(ctx.get(key1) + ", " + name)
                    .transformDeferredContextual((mono, innerCtx) ->
                            mono.map(data -> data + ", " + innerCtx.get("role"))
                    )
                    // flatMap 안에서 쓰기
                    .contextWrite(context -> context.put("role", "CEO"))
            )
        )
        .publishOn(Schedulers.parallel())
        // 앞선 방식 기존 방식 그대로 쓰기
        .contextWrite(context -> context.put(key1, "Apple"))
        .subscribe(data -> log.info("# onNext: {}", data));

    Thread.sleep(100L);
}

// 출력
22:52:55.561 [parallel-1] INFO - # onNext: Apple, Steve, CEO

public class Example {
    public static final String HEADER_AUTH_TOKEN = "authToken";
    public static void main(String[] args) {
        Mono<String> mono =
                postBook(Mono.just(
                        new Book("abcd-1111-3533-2809"
                                , "Reactor's Bible"
                                ,"Kevin"))
                )
                // context 저장
                .contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGciOi"));

        mono.subscribe(data -> log.info("# onNext: {}", data));

    }

    private static Mono<String> postBook(Mono<Book> book) {
        return Mono
                // zip operator로 mono 두개를 하나로 합친다. -> Mono<Tuple2> 가 된다.
                .zip(book,
                        Mono
                            .deferContextual(ctx ->
                                    // 전파된 context 불러오기
                                    Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
                )
                .flatMap(tuple -> {
                    String response = "POST the book(" + tuple.getT1().getBookName() +
                            "," + tuple.getT1().getAuthor() + ") with token: " +
                            tuple.getT2();
                    return Mono.just(response); // HTTP POST 전송을 했다고 가정
                });
    }
}

@AllArgsConstructor
@Data
class Book {
    private String isbn;
    private String bookName;
    private String author;
}

정리

  • context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.
  • Context는 operator 체인의 아래서 위로 전파된다.
  • 동일한 키에 대한 값을 중복해서 저장하면 operator 체인상에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다.
  • inner sequence 내부에서는 외부 Context에 저장된 데이터를 읽을 수 있다.
  • inner sequence 외부에서는 inner sequence 내부 context에 저장된 데이터를 읽을 수 없다.
  • context는 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는데 적합하다.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions