-
Notifications
You must be signed in to change notification settings - Fork 14
Description
Debugging
Reactor는 처리작업들이 비동기적으로 실행되고 선언형 프로그래밍 방식으로 구성되므로 디버깅이 쉽지 않다. 디버깅 어려움을 최소화하기 위해 몇 가지 방법을 제공하긴 한다.
debug mode
reactor에서 디버그 모드를 활성화해서 reactor sequence를 디버깅할 수 있다.
public static Map<String, String> fruits = new HashMap<>();
static {
fruits.put("banana", "바나나");
fruits.put("apple", "사과");
fruits.put("pear", "배");
fruits.put("grape", "포도");
}
public static void main(String[] args) throws InterruptedException {
// 디버그 모드 활성화
Hooks.onOperatorDebug();
Flux
.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(String::toLowerCase)
.map(fruit -> fruit.substring(0, fruit.length() - 1))
.map(fruits::get)
.map(translated -> "맛있는 " + translated)
.subscribe(
log::info,
error -> log.error("# onError:", error));
Thread.sleep(100L);
}
// 디버그 모드 주석 처리 했을 경우 출력 -> map에서 문제가 발생했다는 것은 보이나 map이 여러개라서 어떤 map에서 NPE이 발생했는지 파악이 어렵다.
00:10:53.469 [main] DEBUG- Using Slf4j logging framework
00:10:53.587 [parallel-1] INFO - 맛있는 바나나
00:10:53.592 [parallel-1] INFO - 맛있는 사과
00:10:53.592 [parallel-1] INFO - 맛있는 배
00:10:53.608 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$57/0x000000080015dc40] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
// 디버그 모드 출력
00:33:39.708 [parallel-1] INFO - 맛있는 바나나
00:33:39.713 [parallel-1] INFO - 맛있는 사과
00:33:39.714 [parallel-1] INFO - 맛있는 배
00:33:39.738 [parallel-1] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_1$$Lambda$65/0x000000080015cc40] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
reactor.core.publisher.Flux.map(Flux.java:6271)
chapter12.Example12_1.main(Example12_1.java:35)
// 에러가 발생한 라인 정확히 명시
Error has been observed at the following site(s):
*__Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:35)
|_ Flux.map ⇢ at chapter12.Example12_1.main(Example12_1.java:36)
Original Stack Trace:
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)checkpoint 디버깅
디버그 모드를 활성화 하는 방법이 애플리케이션 내에 있는 모든 operator에서 스택트레이스를 캡처하는 반면에 checkpoint operator를 사용하면 특정 operator 테인 내의 스텍트레이스만 캡쳐한다.
public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
// 앞선 flux와 인자로 받은 flux를 합치고 두번째 인자 함수로 새로운 새로운 값 반환
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
.checkpoint()
.map(num -> num + 2)
.checkpoint()
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
// 출력
00:41:39.807 [main] INFO - # onNext: 4
00:41:39.809 [main] INFO - # onNext: 4
00:41:39.809 [main] INFO - # onNext: 4
00:41:39.830 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxZip] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3352)
chapter12.Example12_3.main(Example12_3.java:16)
Error has been observed at the following site(s):
*__checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:16) // check point 둘다 에러가 전파되었으므로 zipWith나 just중에서 에러가 발생한 것으로 유추 가능
|_ checkpoint() ⇢ at chapter12.Example12_3.main(Example12_3.java:18)
Original Stack Trace:
at chapter12.Example12_3.lambda$main$0(Example12_3.java:15)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_3.main(Example12_3.java:19)public static void main(String[] args) {
Flux
.just(2, 4, 6, 8)
.zipWith(Flux.just(1, 2, 3, 0), (x, y) -> x/y)
// checkpoint에 인자를 줘서 checkpoint description 값을 명시할 수 있다. true를 주면 traceback도 같이 출력할 수 있다.
.checkpoint("Example12_4.zipWith.checkpoint", true)
.map(num -> num + 2)
.checkpoint("Example12_4.map.checkpoint", true)
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error)
);
}
// 출력
00:45:07.937 [main] INFO - # onNext: 4
00:45:07.939 [main] INFO - # onNext: 4
00:45:07.939 [main] INFO - # onNext: 4
00:45:07.962 [main] ERROR- # onError:
java.lang.ArithmeticException: / by zero
at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxZip], described as [Example12_4.zipWith.checkpoint] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3417)
chapter12.Example12_5.main(Example12_5.java:17)
Error has been observed at the following site(s):
*__checkpoint(Example12_4.zipWith.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:17)
|_ checkpoint(Example12_4.map.checkpoint) ⇢ at chapter12.Example12_5.main(Example12_5.java:19)
Original Stack Trace:
at chapter12.Example12_5.lambda$main$0(Example12_5.java:16)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:982)
at reactor.core.publisher.FluxZip$PairwiseZipper.apply(FluxZip.java:971)
at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:738)
at reactor.core.publisher.FluxZip$ZipInner.onSubscribe(FluxZip.java:888)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:595)
at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:332)
at reactor.core.publisher.FluxZip.handleArrayMode(FluxZip.java:273)
at reactor.core.publisher.FluxZip.subscribe(FluxZip.java:137)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_5.main(Example12_5.java:20)log 디버깅
log operator는 reactor sequence의 동작을 로그로 출력한다.
public static Map<String, String> fruits = new HashMap<>();
static {
fruits.put("banana", "바나나");
fruits.put("apple", "사과");
fruits.put("pear", "배");
fruits.put("grape", "포도");
}
public static void main(String[] args) {
Flux.fromArray(new String[]{"BANANAS", "APPLES", "PEARS", "MELONS"})
.map(String::toLowerCase)
.map(fruit -> fruit.substring(0, fruit.length() - 1))
.log()
// 인자로 description을 줄 수 있고 로그 레벨을 설정할 수 있는데 FINE은 자바에서 지원하는 로그 레벨로 DEBUG에 해당한다.
// .log("My Logging", Level.FINE)
.map(fruits::get)
.subscribe(
log::info,
error -> log.error("# onError:", error));
}
// 출력
00:48:50.857 [main] INFO - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
00:48:50.861 [main] INFO - | request(unbounded)
00:48:50.862 [main] INFO - | onNext(banana)
00:48:50.862 [main] INFO - 바나나
00:48:50.863 [main] INFO - | onNext(apple)
00:48:50.863 [main] INFO - 사과
00:48:50.863 [main] INFO - | onNext(pear)
00:48:50.863 [main] INFO - 배
00:48:50.863 [main] INFO - | onNext(melon) // melon 처리 과정에서 cancel이 났으므로 melon에서 에러가 발생했음을 알 수 있다.
00:48:50.875 [main] INFO - | cancel()
00:48:50.880 [main] ERROR- # onError:
java.lang.NullPointerException: The mapper [chapter12.Example12_7$$Lambda$37/0x0000000800158440] returned a null value.
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8639)
at reactor.core.publisher.Flux.subscribe(Flux.java:8436)
at reactor.core.publisher.Flux.subscribe(Flux.java:8360)
at reactor.core.publisher.Flux.subscribe(Flux.java:8330)
at chapter12.Example12_7.main(Example12_7.java:31)정리
- reactor에서는 Hooks.onOperatorDebug 메서드를 호출해 디버그 모드를 활성화할 수 있다.
- 디버그 모드를 활성화 하면 모든 스택트레이스를 캡쳐하므로 운영에서는 사용하지 않는다.
- reactor tools에서 지원하는 reactorDebugAgent를 사용하여 프로덕션 환경에서 디버그 모드를 대체할 수 있다.
- checkpoint operator를 사용하면 특정 operator 체인 내의 스택트레이스만 캡쳐한다.
- log operator를 추가하면 추가한 지점의 reactor signal을 출력한다.
- log operator 사용 개수에는 제한이 없어 1개 이상의 log operator로 reactor sequence 내부 동작을 상세히 분석하면서 디버깅할 수 있다.
Testing
testImplementation 'io.projectreactor:reactor-test'테스트를 위해서는 의존성 추가가 필요하다.
StepVerifier
reactor의 일반적인 테스트 방식은 Flux 또는 Mono를 reactor sequence로 정의한 후, 구독 시점에 해당 operator 체인이 시나리오대로 동작하는지 테스트하는 것이다. reactor에서는 operator 체인의 다양한 동작 방식을 테스트하기 위해 StepVerifier라는 API를 제공한다.
signal 이벤트 테스트
가장 기본적인 테스트는 reactor sequence에서 발생하는 signal 이벤트를 테스트하는 것이다.
@Test
public void sayHelloReactorTest() {
StepVerifier
.create(Mono.just("Hello Reactor")) // 테스트 대상 Sequence 생성
// expectXXX는 sequcne에서 예상되는 signal의 기댓값을 평가한다.
.expectNext("Hello Reactor") // emit 된 데이터 검증
.expectComplete() // onComplete Signal 검증 -> 기대한 대로 sequence가 종료되었는가
.verify(); // 검증 실행.
}- expectSubscription()
- 구독이 이뤄짐을 기대한다.
- expectNext(T t)
- onNext signal을 통해 전달되는 값이 파라미터로 전달된 값과 같음을 기대한다.
- expectComplete()
- onComplete signal이 전송되기를 기대한다.
- expectError()
- onError signal이 전송되기를 기대한다.
- expectNextCount(long count)
- 구독 시점 또는 이전 expectNext를 통해 기대값이 평가된 데이터 이후부터 emit된 수를 기대한다.
- expectNoEvent(Duration duration)
- 주어진 시간 동안 Signal 이벤트가 발생하지 않았음을 기대한다.
- expectAccessibleContext()
- 구독 시점 이후에 context가 전파되었음을 기대한다.
- expectNextSequence(iterable <? extends T> iterable)
- emit된 데이터들이 파라미터로 전달된 iterable의 요소와 매치됨을 기대한다.
verifyXXX 메서드는 내부적으로 테스트 대상 operator 체인에 대한 구독이 이뤄지고 기댓값을 평가하게 된다.
- verify()
- 검증을 트리거
- verifyComplete()
- 검증을 트리거하고 onComplete signal을 기대한다.
- verifyError
- 검증을 트리거하고 onError signal을 기대한다.
- verifyTimeout(Duration duration)
- 검증을 트리거하고, 주어진 시간이 초과되어도 publisher가 종료되지 않음을 기대한다.
// 테스트 대상
public static Flux<String> sayHello() {
return Flux
.just("Hello", "Reactor");
}
@Test
public void sayHelloTest() {
StepVerifier
.create(GeneralTestExample.sayHello())
.expectSubscription()
// as는 이전 기댓값 평가 단계에 대한 description을 추가하고 실패하면 실패한 단계에 로그로 출력된다.
.as("# expect subscription")
.expectNext("Hi")
.as("# expect Hi")
.expectNext("Reactor")
.as("# expect Reactor")
.verifyComplete();
}
// 출력
expectation "# expect Hi" failed (expected value: Hi; actual value: Hello)public static Flux<Integer> divideByTwo(Flux<Integer> source) {
return source
.zipWith(Flux.just(2, 2, 2, 2, 0), (x, y) -> x/y);
}
@Test
public void divideByTwoTest() {
Flux<Integer> source = Flux.just(2, 4, 6, 8, 10);
StepVerifier
.create(GeneralTestExample.divideByTwo(source))
.expectSubscription()
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
// .expectNext(1, 2, 3, 4)
.expectError()
.verify();
}public static Flux<Integer> takeNumber(Flux<Integer> source, long n) {
return source
.take(n);
}
@Test
public void takeNumberTest() {
Flux<Integer> source = Flux.range(0, 1000);
StepVerifier
.create(GeneralTestExample.takeNumber(source, 500),
// 실패시 인자로 입력한 시나리오 이름 출력
StepVerifierOptions.create().scenarioName("Verify from 0 to 499"))
.expectSubscription()
.expectNext(0) // 0이 emit 됨을 기대
.expectNextCount(498) // emit된 데이터가 498개임을 기대 -> 내부적으로 498개를 emit
.expectNext(500) // 500이 emit 됨을 기대
.expectComplete()
.verify();
}
// 출력
[Verify from 0 to 499] expectation "expectNext(500)" failed (expected value: 500; actual value: 499)시간 기반 테스트
가상 시간을 이용해 미래에 실행되는 reactor sequence의 시간을 앞당겨 테스트할 수 있는 기능을 지원한다.
public static Flux<Tuple2<String, Integer>> getCOVID19Count(Flux<Long> source) {
return source
.flatMap(notUse -> Flux.just(
Tuples.of("서울", 10),
Tuples.of("경기도", 5),
Tuples.of("강원도", 3),
Tuples.of("충청도", 6),
Tuples.of("경상도", 5),
Tuples.of("전라도", 8),
Tuples.of("인천", 2),
Tuples.of("대전", 1),
Tuples.of("대구", 2),
Tuples.of("부산", 3),
Tuples.of("제주도", 0)
)
);
}
@Test
public void getCOVID19CountTest() {
StepVerifier
.withVirtualTime(() -> TimeBasedTestExample.getCOVID19Count(
// 현재 시간으로부터 1시간 뒤 발생 확인
Flux.interval(Duration.ofHours(1)).take(1)
)
)
.expectSubscription()
.then(() -> VirtualTimeScheduler
.get()
// 1시간 앞당기기
.advanceTimeBy(Duration.ofHours(1)))
// emit한 데이터가 11개임을 기대
.expectNextCount(11)
.expectComplete()
.verify();
}
@Test
public void getCOVID19CountTest() {
StepVerifier
.create(TimeBasedTestExample.getCOVID19Count(
Flux.interval(Duration.ofMinutes(1)).take(1)
)
)
.expectSubscription()
.expectNextCount(11)
.expectComplete()
// 3초 이내 평가가 이뤄지도록 설정했지만 emit이 1분 뒤에 발생하므로 시간 초과
.verify(Duration.ofSeconds(3));
}public static Flux<Tuple2<String, Integer>> getVoteCount(Flux<Long> source) {
return source
.zipWith(Flux.just(
Tuples.of("중구", 15400),
Tuples.of("서초구", 20020),
Tuples.of("강서구", 32040),
Tuples.of("강동구", 14506),
Tuples.of("서대문구", 35650)
)
)
.map(Tuple2::getT2);
}
@Test
public void getVoteCountTest() {
StepVerifier
.withVirtualTime(() -> TimeBasedTestExample.getVoteCount(
// 1분에 하나씩 emit
Flux.interval(Duration.ofMinutes(1))
)
)
.expectSubscription()
// 지정 시간동안 어떤 signal 이벤트도 발생하지 않았음을 기대 -> 시간 지정시 지정한 시간동안 어떤 이벤트도 발생하지 않을 것이라고 기대하는 동시 지정한 시간만큼 앞당긴다.
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNoEvent(Duration.ofMinutes(1))
.expectNextCount(5)
.expectComplete()
.verify();
}Backpressure 테스트
public static Flux<Integer> generateNumber() {
return Flux
.create(emitter -> {
// 100개 emit
for (int i = 1; i <= 100; i++) {
emitter.next(i);
}
emitter.complete();
// backpressure 전략으로 오버플로가 발생하면 에러 발생
}, FluxSink.OverflowStrategy.ERROR);
}
@Test
public void generateNumberTest() {
StepVerifier
// generateNumber는 100개를 emit하지만 두번째 인자로 요청 개수가 1개이므로 오버플로가 발생한다.
.create(BackpressureTestExample.generateNumber(), 1L)
// emit되는 데이터 소비
.thenConsumeWhile(num -> num >= 1)
.verifyComplete();
}
@Test
public void generateNumberTest() {
StepVerifier
.create(BackpressureTestExample.generateNumber(), 1L)
.thenConsumeWhile(num -> num >= 1)
// error를 기대
.expectError()
// 검증을 trigger 후, 추가적인 assertion 가능
.verifyThenAssertThat()
// drop된 데이터가 있음을 assertion
.hasDroppedElements();
}context 테스트
public static Mono<String> getSecretMessage(Mono<String> keySource) {
return keySource
.zipWith(Mono.deferContextual(ctx ->
Mono.just((String)ctx.get("secretKey"))))
.filter(tp ->
tp.getT1().equals(
new String(Base64Utils.decodeFromString(tp.getT2())))
)
.transformDeferredContextual(
// secretMessage 저장
(mono, ctx) -> mono.map(notUse -> ctx.get("secretMessage"))
);
}
@Test
public void getSecretMessageTest() {
Mono<String> source = Mono.just("hello");
StepVerifier
.create(
ContextTestExample
.getSecretMessage(source)
// context에 데이터 삽입
.contextWrite(context ->
context.put("secretMessage", "Hello, Reactor"))
.contextWrite(context -> context.put("secretKey", "aGVsbG8="))
)
// 구독 발생을 기대 -> verify때문에 구독됨
.expectSubscription()
// 구독 이후 context가 전파됨을 기대
.expectAccessibleContext()
// 전파된 context에 키에 해당하는 값이 있음을 기대
.hasKey("secretKey")
.hasKey("secretMessage")
// then 메서드로 sequence의 다음 signal 이벤트 기댓값 평가가 가능
.then()
// hello reactor이 emit됨을 기대
.expectNext("Hello, Reactor")
.expectComplete()
.verify();
}Record 기반 테스트
reactor sequence를 테스트하다 보면 expectnext로 emit된 데이터의 단순 기댓값만 평가하는 것이 아니라 조금 더 구체적인 조건으로 assertion해야하는 경우가 많다. 이 경우 recordWith를 사용할 수 있다. recordWith는 파라미터로 전달한 java의 컬렉션에 emit된 데이터를 추가(기록)하는 세션을 시작한다.
public static Flux<String> getCapitalizedCountry(Flux<String> source) {
return source
.map(country -> country.substring(0, 1).toUpperCase() +
country.substring(1));
}
@Test
public void getCountryTest() {
StepVerifier
.create(RecordTestExample.getCapitalizedCountry(
Flux.just("korea", "england", "canada", "india")))
// 구독 발생을 기대
.expectSubscription()
// emit된 데이터에 대한 기록을 시작
.recordWith(ArrayList::new)
// 파라미터로 전달한 predicate과 일치하는 데이터는 다음 단계에서 소비될 수 있도록 한다.
.thenConsumeWhile(country -> !country.isEmpty())
// 컬렉션에 기록된 데이터를 소비한다.
.consumeRecordedWith(countries -> {
// 상세한 assertion 정의
assertThat(
countries
.stream()
.allMatch(country ->
Character.isUpperCase(country.charAt(0))),
is(true)
);
})
.expectComplete()
.verify();
}
@Test
public void getCountryTest() {
StepVerifier
.create(RecordTestExample.getCapitalizedCountry(
Flux.just("korea", "england", "canada", "india")))
.expectSubscription()
.recordWith(ArrayList::new)
.thenConsumeWhile(country -> !country.isEmpty())
// consumeRecordedWith 안에서 assertion이 아니라 expectRecordedMatches 메서드 내에서 predicate을 사용하여 컬레션에 기록된 모든 데이터의 첫 글자가 대문자임을 기대
// 하지만 consumeRecordedWith는 기록된 데이터를 소비하면서 더 다양한 조건으로 테스트를 진행할 수 있다.
.expectRecordedMatches(countries ->
countries
.stream()
.allMatch(country ->
Character.isUpperCase(country.charAt(0))))
.expectComplete()
.verify();
}TestPublisher를 사용한 테스팅
reactor-test 모듈에서 지원하는 테스트 전용 publisher이다. 이를 사용하면 개발자가 직접 프로그래밍 방식으로 signal을 발생시키면서 원하는 상황을 재현해서 테스트할 수 있다.
testPublisher는 다음과 같은 signal 유형을 발생시킬 수 있다.
- next(T) 또는 next(T, T...) : 1개 이상의 onNext signal을 발생시킨다.
- emit(T ...) : 1개 이상 onNext signal을 발생시킨 후, onComplete Signal을 발생시킨다.
- complete() : onComplete 시그널을 발생시킨다.
- error(Throwable) : onError 시그널을 발생시킨다.
정상 동작하는 TestPublisher
public static Flux<Integer> divideByTwo(Flux<Integer> source) {
return source
.zipWith(Flux.just(2, 2, 2, 2, 0), (x, y) -> x/y);
}
@Test
public void divideByTwoTest() {
// test publisher 생성
TestPublisher<Integer> source = TestPublisher.create();
StepVerifier
// flux로 전달하기 위해 flux 메서드로 flux로 전환
.create(GeneralTestExample.divideByTwo(source.flux()))
// 구독을 기대
.expectSubscription()
// 필요한 데이터를 emit
.then(() -> source.emit(2, 4, 6, 8, 10))
// 기대값
.expectNext(1, 2, 3, 4)
// 에러를 기대
.expectError()
.verify();
}오작동하는 TestPublisher
오작동은 리액티브 스트림즈 사양 위반 여부를 사전에 체크하지 않는다는 의미로 사양에 위반되더라도 TestPublisher는 데이터를 emit할 수 있다.
다음과 같은 옵션이 있다.
- ALLOW_NULL : 전송할 데이터가 null이어도 NPE을 발생시키지 않고 다음 호출을 진행 가능
- CLEANUP_ON_TERMINATE : onComplete, onError, emit 같은 Terminal Signal을 연달아 여러 번 보낼 수 있다.
- DEFER_CANCELLATION : cancel signal을 무시하고 계속해서 signal을 emit할 수 있도록 한다.
- REQUEST_OVERFLOW : 요청 개수보다 더 많은 signal이 발생하더라도 illegalStateException을 발생시키지 않고 다음 호출을 진행할 수 있도록 한다.
public static Flux<Integer> divideByTwo(Flux<Integer> source) {
return source
.zipWith(Flux.just(2, 2, 2, 2, 0), (x, y) -> x/y);
}
@Test
public void divideByTwoTest() {
// TestPublisher<Integer> source = TestPublisher.create();
TestPublisher<Integer> source =
// 오작동하는 publisher를 동작하도록 Violation.ALLOW_NULL 위반 조건을 지정하여 데이터 값이 null이더라도 정상 동작하도록 testPublisher를 생성
TestPublisher.createNoncompliant(TestPublisher.Violation.ALLOW_NULL);
StepVerifier
.create(GeneralTestExample.divideByTwo(source.flux()))
.expectSubscription()
.then(() -> {
getDataSource().stream()
.forEach(data -> source.next(data));
source.complete();
})
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify();
}
private static List<Integer> getDataSource() {
return Arrays.asList(2, 4, 6, 8, null);
}
// 출력 -> 로그 라인을 보면 onNext에서 에러가 발생함을 알 수 있다.
java.lang.NullPointerException: e
at java.base/java.util.Objects.requireNonNull(Objects.java:246)
at reactor.util.concurrent.SpscArrayQueue.offer(SpscArrayQueue.java:51)
at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:909)
at reactor.test.publisher.DefaultTestPublisher$TestPublisherSubscription.onNext(DefaultTestPublisher.java:233)
at reactor.test.publisher.DefaultTestPublisher.next(DefaultTestPublisher.java:401)
at reactor.test.publisher.DefaultTestPublisher.next(DefaultTestPublisher.java:41)
at chapter13.ExampleTest13_19.lambda$divideByTwoTest$0(ExampleTest13_19.java:25)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
at chapter13.ExampleTest13_19.lambda$divideByTwoTest$1(ExampleTest13_19.java:25) -> onNext 라인
// 가장 처음 줄의 testPublisher의 주석을 풀고 바로 아래 source를 주석하고 실행해보면 emit에서부터 NPE가 발생한다.
emitted values must be non-null
java.lang.NullPointerException: emitted values must be non-nullpublisherProbe를 사용한 테스팅
reactor-test 모듈에서 publisherProbe를 이용해 sequence의 실행 경로를 테스트할 수 있다. 주로 조건에 따라 sequence가 분기되는 경우 정상적으로 실행되었는지 테스트할 때 사용된다.
public static Mono<String> processTask(Mono<String> main, Mono<String> standby) {
return main
.flatMap(massage -> Mono.just(massage))
// emit 데이터가 없는 경우 대체
.switchIfEmpty(standby);
}
public static Mono<String> supplyMainPower() {
return Mono.empty();
}
public static Mono supplyStandbyPower() {
return Mono.just("# supply Standby Power");
}
@Test
public void publisherProbeTest() {
// 실행 경로 테스트 대상 publisher를 래핑 -> processTask에서 보면 main을 기준으로 동작하므로 main 제공 mono를 감싸면 된다.
PublisherProbe<String> probe =
PublisherProbe.of(PublisherProbeTestExample.supplyStandbyPower());
StepVerifier
.create(PublisherProbeTestExample
.processTask(
PublisherProbeTestExample.supplyMainPower(),
probe.mono())
)
.expectNextCount(1)
.verifyComplete();
// 구독됐는지
probe.assertWasSubscribed();
// 요청이 됐는지
probe.assertWasRequested();
// 중간에 취소되지 않았는지
probe.assertWasNotCancelled();
}