Skip to content

[spring webflux] webClient, reactive streaming 데이터 처리 #58

@backtony

Description

@backtony

webClient

spring 5부터 지원하는 논블로킹 http request를 위한 리액티브 웹 클라이언트로서 함수형 기반의 향상된 API를 제공한다. 이는 내부적으로 HTTP 클라이언트 라이브러리에게 HTTP request를 위임하며, 기본 HTTP 클라이언트 라이브러리는 Reactor Netty이다. mvc의 restTemplate을 대체한다고 보면 된다.

public class WebClientExample01 {

    private void exampleWebClient01() {
        BookDto.Post requestBody = new BookDto.Post("Java 중급",
                "Intermediate Java",
                "Java 중급 프로그래밍 마스터",
                "Kevin1", "222-22-2222-222-2",
                "2022-03-22");
        // 클라이언트 생성
        WebClient webClient = WebClient.create();
        Mono<ResponseEntity<Void>> response =
                webClient
                        .post() // 메서드 지정
                        .uri("http://localhost:8080/v10/books") // 경로 지정
                        .bodyValue(requestBody) // body 지정
                        .retrieve() // response를 어떻게 받을지에 대한 프로세스 시작에 대한 선언
                        .toEntity(Void.class); // 파라미터로 주어진 클래스의 형태로 변환환 response body가 포함된 responseEntity 객체를 리턴

        // 응답으로 받은 Mono<ResponseEntity<Void>>를 구독
        response.subscribe(res -> {
           log.info("response status: {}", res.getStatusCode());
           log.info("Header Location: {}", res.getHeaders().get("Location"));
        });
    }

    private void exampleWebClient02() {
        BookDto.Patch requestBody =
                new BookDto.Patch.PatchBuilder().titleKorean("Java 고급")
                .titleEnglish("Advanced Java")
                .description("Java 고급 프로그래밍 마스터")
                .author("Tom")
                .build();

        WebClient webClient = WebClient.create("http://localhost:8080");
        Mono<BookDto.Response> response =
                webClient
                        .patch()
                        .uri("http://localhost:8080/v10/books/{book-id}", 20)
                        .bodyValue(requestBody)
                        .retrieve()
                        .bodyToMono(BookDto.Response.class); // responseBody를 파라미터로 전달된 타입의 객체로 디코딩

        response.subscribe(book -> {
            log.info("bookId: {}", book.getBookId());
            log.info("titleKorean: {}", book.getTitleKorean());
            log.info("titleEnglish: {}", book.getTitleEnglish());
            log.info("description: {}", book.getDescription());
            log.info("author: {}", book.getAuthor());
        });
    }

    private void exampleWebClient03() {
        Mono<BookDto.Response> response =
                WebClient
                        .create("http://localhost:8080")
                        .get()
                        // uri 빌더를 사용해 path variable의 값을 포함한 uri 생성
                        .uri(uriBuilder -> uriBuilder
                                .path("/v10/books/{book-id}")
                                .build(21))// build 메서드의 파라미터로 전달한 값은 path variable의 값으로 설정됨
                                .retrieve()
                                .bodyToMono(BookDto.Response.class);

        response.subscribe(book -> {
            log.info("bookId: {}", book.getBookId());
            log.info("titleKorean: {}", book.getTitleKorean());
            log.info("titleEnglish: {}", book.getTitleEnglish());
            log.info("description: {}", book.getDescription());
            log.info("author: {}", book.getAuthor());
        });
    }

    private void exampleWebClient04() {
        Flux<BookDto.Response> response =
                WebClient
                        .create("http://localhost:8080")
                        .get()
                        // uri builder로 쿼리 파라미터 지정
                        .uri(uriBuilder -> uriBuilder
                                .path("/v10/books")
                                .queryParam("page", "1")
                                .queryParam("size", "10")
                                .build())
                        .retrieve()
                        .bodyToFlux(BookDto.Response.class); // bodyToMon와 달리 java Collection 타입의 response body를 수신

        response
                .map(book -> book.getTitleKorean())
                .subscribe(bookName -> log.info("book name: {}", bookName));
    }
}

connection timeout 설정

private void exampleWebClient01() {
    // webClient에 HTTP client connector를 설정하기 위해 서버 엔진에서 제공하는 HTTP client 객체를 생성
    HttpClient httpClient =
            HttpClient
                    .create()
                    // connection timeout 지정
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500)
                    // 응답 timeout 지정
                    .responseTimeout(Duration.ofMillis(500))
                    // connection이 연결된 후 수행할 동작 정의
                    .doOnConnected(connection ->
                        connection
                                // 특정 시간동안 읽을 수 있는 데이터가 없을 경우 예외 발생
                                .addHandlerLast(
                                        new ReadTimeoutHandler(500,
                                                            TimeUnit.MILLISECONDS))
                                // 특정 시간동안 쓰기 작업을 종료할 수 없을 경우 예외 발생
                                .addHandlerLast(
                                        new WriteTimeoutHandler(500,
                                                            TimeUnit.MILLISECONDS)));

    Flux<BookDto.Response> response =
            // builder 패턴을 사용해 webClient 생성
            WebClient
                    .builder()
                    .baseUrl("http://localhost:8080")
                    // HTTP client 세팅
                    .clientConnector(new ReactorClientHttpConnector(httpClient))
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/v10/books")
                            .queryParam("page", "1")
                            .queryParam("size", "10")
                            .build())
                    .retrieve()
                    .bodyToFlux(BookDto.Response.class);

    response
            .map(book -> book.getTitleKorean())
            .subscribe(bookName -> log.info("book name2: {}", bookName));
}

exchangeToMono를 사용한 응답 디코딩

retrieve 대신 exchangeToMono나 exchangeToFlux를 사용하면 response를 사용자의 요구 조건에 맞게 제어할 수 있다.

private void exampleWebClient02() {
    BookDto.Post post = new BookDto.Post("Java 중급",
            "Intermediate Java",
            "Java 중급 프로그래밍 마스터",
            "Kevin1", "333-33-3333-333-3",
            "2022-03-22");
    WebClient webClient = WebClient.create();
    webClient
            .post()
            .uri("http://localhost:8080/v10/books")
            .bodyValue(post)
            // exchangeToMono를 통해 response를 수신하고, http status가 created이면 responseEntity를 리턴하고 그 외에는 exception을 throw
            .exchangeToMono(response -> {

                if(response.statusCode().equals(HttpStatus.CREATED))
                    return response.toEntity(Void.class);
                else
                    return response
                            // request/response 정보를 포함한 webCleintResponseException을 생성한다.
                            .createException()
                            .flatMap(throwable -> Mono.error(throwable));
            })
            .subscribe(res -> {
                log.info("response status2: {}", res.getStatusCode());
                log.info("Header Location2: {}", res.getHeaders().get("Location"));
                },
                error -> log.error("Error happened: ", error));
}

reactive streaming 데이터 처리

webflux는 SSE(server sent events)를 이용해 데이터를 스트리밍할 수 있따. SSE는 클라이언트가 HTTP 연결을 통해 서버로부터 전송되는 업데이트 데이터를 지속적으로 수신할 수 있는 단방향 서버 푸시 기술이다. SSE는 주로 클라이언트 측에서 서버로부터 전송되는 이벤트 스트림을 자동으로 수신하기 위해 사용된다.

// 스트리밍 방식으로 데이터를 전송하기 위한 body 타입은 Flux여야 한다.
public Flux<Book> streamingBooks() {
    return template
            .select(Book.class)
            .all() // 전부 조회
            // 2초에 한번씩 데이터를 emit
            .delayElements(Duration.ofSeconds(2L));
}
// router
@Bean
public RouterFunction<?> routeStreamingBook(BookService bookService,
                                            BookMapper mapper) {
    return route(RequestPredicates.GET("/v11/streaming-books"),
            request -> ServerResponse
                    .ok()
                    // event stream 콘텐트 타입
                    .contentType(MediaType.TEXT_EVENT_STREAM)
                    // response body 지정
                    .body(bookService.streamingBooks().map(book -> mapper.bookToResponse(book)),
                            BookDto.Response.class));
}
// 데이터 수신자
@Slf4j
@Configuration
public class BookWebClient {
    @Bean
    public ApplicationRunner streamingBooks() {
        return (ApplicationArguments arguments) -> {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<BookDto.Response> response =
                    webClient
                            .get()
                            .uri("http://localhost:8080/v11/streaming-books")
                            .retrieve()
                            .bodyToFlux(BookDto.Response.class);

            response.subscribe(book -> {
                        log.info("bookId: {}", book.getBookId());
                        log.info("titleKorean: {}", book.getTitleKorean());
                        log.info("titleEnglish: {}", book.getTitleEnglish());
                        log.info("description: {}", book.getDescription());
                        log.info("author: {}", book.getAuthor());
                        log.info("isbn: {}", book.getIsbn());
                        log.info("publishDate: {}", book.getPublishDate());
                        log.info("=======================================");
                    },
                    error -> log.error("# error happened: ", error));
        };
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions