Skip to content

[spring webflux] spring webflux #57

@backtony

Description

@backtony

spring webflux

들어가기 앞서 spring webflux를 공부하다보면 subscribe 호출이 없는 것을 볼수 있다. 이는 스프링에서 subscribe 메서드를 호출해주기 때문이다.

아래는 HttpServer의 inner static class인 HttpServerHandler의 onStateChange 메서드다.

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onStateChange(Connection connection, State newState) {
    if (newState == HttpServerState.REQUEST_RECEIVED) {
        try {
            if (log.isDebugEnabled()) {
                log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
            }
            HttpServerOperations ops = (HttpServerOperations) connection;
            Publisher<Void> publisher = handler.apply(ops, ops);
            Mono<Void> mono = Mono.deferContextual(ctx -> {
                ops.currentContext = Context.of(ctx);
                return Mono.fromDirect(publisher);
            });
            if (ops.mapHandle != null) {
                mono = ops.mapHandle.apply(mono, connection);
            }
            mono.subscribe(ops.disposeSubscriber());
        }
        catch (Throwable t) {
            ...
        }
    }
}

여기서 subscribe 메서드는 Mono 또는 Flux 형태의 Http 응답으로 클라이언트에 전송될 데이터 스트림을 실제 응답 데이터로 클라이언트에 보내는 역할을 하게 되는 것이고, disposeSubscriber 메서드는 응답이 완전히 전송된 후 응답과 관련된 모든 리소스(네트워크 연결 해제, 버퍼 리소스 해제)를 하는 역할을 수행한다.

즉, webFlux에서 개발자는 특별한 일이 아닌 이상 subscribe 메서드를 명시적으로 호출하지 않아야 한다.

자세한 내용은 여기의 subscribe 명시적 호출 파트를 읽어보자.

그리고 웹플럭스는 디버깅이 쉽지 않은데 application 시작 지점에 onOperatorDebug를 추가하면 좀 더 편하게 디버깅할 수 있다. 운영환경에서는 주의가 필요하다.

@SpringBootApplication
class ServerApplication

fun main(args: Array<String>) {
    // 추가
    Hooks.onOperatorDebug()
    runApplication<ServerApplication>(*args)
}

개요

기술 스택

1
  • 서버
    • mvc는 서블릿 기반으로 아파치 톰캣 같은 서블릿 컨테이너에서 Blocking I/O 방식으로 동작
    • webFlux는 Non-Blocking I/O 방식으로 동작하는 Netty 등의 서버 엔진에서 동작
  • 서버 API
    • mvc는 서블릿 기반으로 서블릿 api 사용
    • webflux는 기본 엔진이 netty이지만 jetty나 undertow같은 서버에서 지원하는 리액티브 스트림즈 어댑터를 통해 리액티브 스트림즈를 지원
  • 보안
    • mvc는 표준 서블릿 필터를 사용하는 spring security가 서블릿 컨테이너와 통합
    • webflux는 webfilter를 이용해 spring security를 사용
  • 데이터 액세스
    • mvc는 블로킹 방식으로 jpa, jdbc 등의 기술 사용
    • webflux는 non-blocking 방식인 r2dbc 등을 사용

요청 흐름

2

  1. 최초의 클라이언트로부터 요청이 들어오면 Netty 등의 서버 엔진을 거쳐 HttpHanlder가 들어오는 요청을 받는다. HttpHanlder는 Netty 이외의 다양한 서버 엔진에서 지원하는 서버 API를 사용할 수 있도록 서버 API를 추상화해주는 역할을 한다. 따라서 각 서버 엔진마다 주어지는 serverHttpRequest와 serverHttpResponse를 포함하는 serverWebExchange를 생성한 후, WebFliter 체인으로 전달한다.
  2. ServerWebExchange는 WebFilter 체인에서 전처리 과정을 거친 후, WebHandler 인터페이스 구현체인 DispatcherHandler에게 전달된다.
  3. Spring MVC의 DispatcherServlet과 유사한 역할을 하는 DispatcherHandler에서는 HandlerMapping List를 원본 Flux의 소스로 전달받는다.
  4. ServerWebExchange를 처리할 핸들러를 호출한다.
  5. 조회한 핸들러의 호출을 HandlerAdapter에게 위임한다.
  6. HandlerAdapter는 ServerWebExchange를 처리할 핸들러를 호출한다.
  7. Controller 또는 HandlerFunction 형태의 핸들러에서 요청을 처리한 후, 응답 데이터를 리턴한다.
  8. 핸들러로부터 리턴받은 응답 데이터를 처리할 HandlerResultHandler를 조회한다.
  9. 조회한 HandlerResultHandler가 응답 데이터를 적절하게 처리한 후, response로 리턴한다.

편의상 핸들러에서 응답 데이터가 리턴된다고 표현했다. 하지만 실제 핸들러에서 리턴되는 것은 응답 데이터를 포함하고 있는 Flux 또는 Mono이기 때문에 메서드 호출을 통해 리턴된 reactor sequence가 즉시 어떤 작업을 수행한다는 의미가 아니라는 사실을 기억하자.

핵심 컴포넌트

HttpHandler

HttpHandler는 다른 유형의 HTTP 서버 API로 request와 reponse를 처리하기 위해 추상화된 단 하나의 메서드만을 가진다.

public interface HttpHandler {
    public abstract void handle (HttpExchange exchange) throws IOException;
}
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
    // ...

    @Override
	public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
        // ...
    }

    // ..
}

HttpHandler의 구현체인 HttpWebHandlerAdapter는 handler 메서드의 파라미터로 전달받은 serverHttpRequest와 ServerHttpResponse로 ServerWebExchange를 생성한 후에 WebHandler를 호출하는 역할을 한다.

WebFilter

mvc의 서블릿 필터처럼 핸들러가 요청을 처리하기 전에 전처리 작업을 수행할 수 있도록 해준다. 주로 보안이나 세션 타임아웃 처리 등 애플리케이션에서 공통으로 필요한 전처리에 사용된다. WebFilter의 구현체는 빈으로 등록하지 않아도 자동으로 빈으로 등록된다.

public interface WebFilter {
	Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain);
}

위 와 같이 메서드 하나만 정의되어 있으며, 파라미터로 전달받은 WebFilterChain을 통해 필터 체인을 형성하여 원하는 만큼의 WebFilter를 추가할 수 있다.

// 예시
@Component
public class BookLogFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String path = exchange.getRequest().getURI().getPath();
        return chain.filter(exchange)
            // 종료 이벤트 (onComplete, onError) 발생 시 요청 URI에 books가 포함되어 있으면 로그 출력
            .doAfterTerminate(() -> {
            if (path.contains("books")) {
                System.out.println("path: " + path + ", status: " +
                        exchange.getResponse().getStatusCode());
            }
        });
    }
}

HandlerFilterFunction

함수형 기반의 요청 핸들러에 적용할 수 있는 Filter이다. 애너테이션 기반이 아닌 함수형 기반의 요청 핸들러에서 함수 형태로 사용되기 때문에 webFilter처럼 spring bean으로 자동으로 등록되진 않는다.

@FunctionalInterface
public interface HandlerFilterFunction<T extends ServerResponse, R extends ServerResponse> {
	Mono<R> filter(ServerRequest request, HandlerFunction<T> next);
    // ...
}

파라미터로 전달받은 HandlerFunction에 연결된다.

public class BookRouterFunctionFilter implements HandlerFilterFunction {
    @Override
    public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction next) {
        String path = request.requestPath().value();

        return next.handle(request).doAfterTerminate(() -> {
            System.out.println("path: " + path + ", status: " +
                    request.exchange().getResponse().getStatusCode());
        });
    }
}

@Configuration
public class BookRouterFunction {
    @Bean
    public RouterFunction routerFunction() {
        return RouterFunctions
                .route(GET("/v1/router/books/{book-id}"),
                        (ServerRequest request) -> this.getBook(request))
                // 특정 path에 대한 FunctionFilter 등록
                .filter(new BookRouterFunctionFilter());
    }

    public Mono<ServerResponse> getBook(ServerRequest request) {
        return ServerResponse
                .ok()
                .body(Mono.just(BookDto.Response.builder()
                        .bookId(Long.parseLong(request.pathVariable("book-id")))
                        .bookName("Advanced Reactor")
                        .author("Tom")
                        .isbn("222-22-2222-222-2").build()), BookDto.Response.class);
    }
}

cf) webFilter와 handlerFilterFunction의 차이점
WebFilter는 모든 핸들러에 공통으로 동작한다. 반면에 HandlerFilterFunction은 함수형 기반의 핸들러에서만 동작하기 때문에 함수형 기반의 핸들러에서만 제한적으로 필터링 작업을 수행하고 싶다면 HandlerFilterFunction을 구현해서 사용해야 한다.

DispatcherHandler

WebHandler 인터페이스의 구현체로 mvc에서 front controller 패턴이 적용된 DispatcherServlet처럼 중앙에서 먼저 요청을 전달받은 후에 다른 컴포넌트에 요청 처리를 위임하는 역할을 한다. DispatcherHandler 자체가 Spring bean으로 등록되도록 설계되었으며, ApplicationContext HandlerMapping, HandlerAdapter, Handler ResultHandler 등의 요청 처리를 위한 위임 컴포넌트를 검색한다.

public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
    // ...
    protected void initStrategies(ApplicationContext context) {...}
    public Mono<Void> handle(ServerWebExchange exchange) {...}
    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {...}
    private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {...}
    private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {...}
    // ...
}
  1. initStrategies
    • beanFactoryUtils를 이용해 applicationContext로부터 HandlerMapping Bean, HandlerAdapter Bean, HAndlerResultHandler Bean을 검색한 후에 각각 List, List, List 객체를 생성한다.
  2. handle
    • handle는 List을 Flux.fromIterable() operator의 원본 데이터 소스로 입력받은 후에 getHandler 메서드를 통해 매치되는 Handler 중에서 첫 번째 핸들러를 사용한다.
    • invokeHandler를 통해 핸들러 호출을 위임한다.
      • 실제 핸들러 호출은 invokeHandler 내부에서 Handler 객체와 매핑되는 HandlerAdapter를 통해서 이뤄진다.
    • handleResult를 통해 응답 처리를 위임한다.
      • 실제 응답 처리는 handleResult 내부에서 호출한 getResultHandler에서 HandlerResult 객체와 매핑되는 HandlerResultHandler를 통해서 이뤄진다.

HandlerMapping

mvc와 마찬가지로 request와 handler object에 대한 매핑을 정의하는 인터페이스이며, HandlerMapping 인터페이스를 구현하는 구현 클래스로는 RequestMappingHandlerMapping, RouterFunctionMapping 등이 있다.

public interface HandlerMapping {
    // ...
    Mono<Object> getHandler(ServerWebExchange exchange);
}

파라미터로 입력받은 serverWebExchange에 매치되는 handler object를 리턴한다.

HandlerAdapter

HAndlerMapping을 통해 얻은 핸들러를 직접적으로 호출하는 역할을 하며, 응답 결과로 Mono를 리턴받는다.
Spring 5.0 Reactive 스택에서 지원하는 HandlerAdapter 구현 클래스로 RequestMappingHandlerAdapter, HandlerFunctionAdapter, SimpleHandlerAdapter, WebSocketHandlerAdapter가 있다.

public interface HandlerAdapter {
	boolean supports(Object handler);
	Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
  • supports : 파라미터로 전달받은 handler object를 지원하는지 체크
  • handle : 파라미터로 전달받은 handler object를 통해 핸들러 메서드를 호출

Spring WebFlux의 Non-Blocking 프로세스 구조

mvc는 블로킹 방식으로 기본적으로 대용량의 스레드 풀을 사용해서 하나의 요청을 하나의 스레드가 처리한다. 반면에 논블로킹 방식인 webFlux는 스레드가 차단되지 않기 때문에 적은 수의 고정된 스레드 풀을 사용해서 더 많은 요청을 처리한다. webFlux가 스레드 차단 없이 더 많은 요청을 처리할 수ㅡ 있는 이유는 요청 처리 방식을 이벤트 루프 방식을 사용하기 때문이다.

3

클라리언트 -> 요청 핸들러 -> 요청 이벤트 푸시 -> 이벤트 루프 -> 콜백 등록 -> 비용이 드는 작업(db, 네트워크, 연산 등) -> 작업 완료 이벤트 푸시 -> 이벤트 루프 -> 콜백 호출 -> 요청 핸들러 -> 클라이언트

  1. 클라이언트로부터 들어오는 요청을 요청 핸들러가 전달 받는다.
  2. 전달받은 요청을 이벤트 루프에 푸시한다.
  3. 이벤트 루프는 네트워크, db 연결 작업 등 비용이 드는 작업에 대한 콜백을 등록한다.
  4. 작업이 완료되면 완료 이벤트를 이벤트 루프에 푸시한다.
  5. 등록한 콜백을 호출해 처리 결과를 전달한다.

이벤트 루프는 단일 스레드에서 계속 실행되며, 클라이언트의 요청이나 db, 네트워크 등 모든 작업들이 이벤트로 처리되기 때문에 이벤트 발생 시 해당 이벤트에 대한 콜백을 등록함과 동시에 다음 이벤트 처리로 넘어간다. 결과적으로 webflux가 이벤트 루프 방식을 도입함으로써 적은 수의 스레드로 많은 수의 요청을 논블로킹 프로세스로 처리할 수 있는 것이다.

webFlux 스레드 모델

mvc는 요청이 들어올때마다 서블릿 컨테이너의 스레드 풀에 미리 생성되어 있는 스레드가 요청을 처리하고 완료되면 반납하는 스레드 풀 모델을 사용한다. 클라이언트 요청당 하나의 스레드를 사용하기 때문에 많은 수의 스레드가 필요하다.

반면에 webflux는 논블로킹 i/o를 지원하는 netty 등의 서버 엔진에서 적은 수의 고정된 크기의 스레드(일반적으로는 cpu 코어 개수만큼의 스레드를 생성)를 생성해서 대량의 요청을 처리한다.

public interface LoopResources extends Disposable {

	/**
	 * Default worker thread count, fallback to available processor
	 * (but with a minimum value of 4)
	 */
	int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
			ReactorNetty.IO_WORKER_COUNT,
			"" + Math.max(Runtime.getRuntime().availableProcessors(), 4)));
}

만약 코어 개수가4개보다 더 적은 경우 최소 4개의 워커 스레드를 생성하고 4보다 더 많다면 코어 개수만큼 스레드를 생성한다.

webflux는 reactor netty 같은 서버 엔진에서 지원하는 워커 스레드 풀을 통해 적은 수의 스레드로 대량의 요청을 효과적으로 처리할 수 있다. 하지만 복잡한 연산 처리 등의 cpu 집약적인 작업을 하거나, 클라이언트의 요청부터 응답 처리 전 과정에 블로킹작업이 존재한다면 오히려 성능 저하가 발생한다.

이러한 성능 저하를 보완하고자 클라이언트의 요청을 처리하기 위해 서버 엔진에서 제공되는 스레드 풀이 아닌 다른 스레드 풀을 사용할 수 있는 메커니즘을 제공하는데 그것이 reactor의 scheduler인 것이다.

컨트롤러

webflux에는 애너테이션 기반 모델과 함수형 기반 모델을 제공한다.

애노테이션 기반

// controller
@RestController
@RequestMapping("/v1/books")
public class BookController {
    private final BookService bookService;
    private final BookMapper mapper;

    public BookController(BookService bookService, BookMapper mapper) {
        this.bookService = bookService;
        this.mapper = mapper;
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono postBook(@RequestBody Mono<BookDto.Post> requestBody) {
        Mono<Book> result = bookService.createBook(requestBody);

        return result.flatMap(book -> Mono.just(mapper.bookToResponse(book)));
    }

    @PatchMapping("/{book-id}")
    public Mono patchBook(@PathVariable("book-id") long bookId,
                                    @RequestBody Mono<BookDto.Patch> requestBody) {
        Mono<Book> result = bookService.updateBook(bookId, requestBody);
        return result.flatMap(book -> Mono.just(mapper.bookToResponse(book)));
    }

    @GetMapping("/{book-id}")
    public Mono getBook(@PathVariable("book-id") long bookId) {
        return bookService.findBook(bookId)
                .flatMap(book -> Mono.just(mapper.bookToResponse(book)));
    }
}

애노테이션 기반의 컨트롤러는 기존 mvc에서 사용하던 방식에서 request 타입과 return 타입이 reactive가 사용되었다는 것 이외에는 변경된 부분이 없다.

@Service
public class BookService {
    private final BookMapper mapper;

    public BookService(BookMapper mapper) {
        this.mapper = mapper;
    }

    public Mono<Book> createBook(Mono<BookDto.Post> book) {
        // not implement business logic;
        return book.flatMap(post -> Mono.just(mapper.bookPostToBook(post)));
    }

    public Mono<Book> updateBook(final long bookId, Mono<BookDto.Patch> book) {
        // not implement business logic;
        return book.flatMap(patch -> {
            patch.setBookId(bookId);
            return Mono.just(mapper.bookPatchToBook(patch));
        });
    }

    public Mono<Book> findBook(long bookId) {
        return Mono.just(
                    new Book(bookId,
                            "Java 고급",
                            "Advanced Java",
                            "Kevin",
                            "111-11-1111-111-1",
                            "Java 중급 프로그래밍 마스터",
                            "2022-03-22",
                            LocalDateTime.now(),
                            LocalDateTime.now())
        );
    }
}

service 파트에서는 Reactive 타입으로 넘어온 파라미터를 사용하기 위해 flatMap을 사용하는 것을 볼 수 있다. 기본적으로 reactor java에서 map은 각 요소에 함수를 적용하여 반환하는데 사용되고 flatMap은 각 요소에 함수를 적용하고 반환된 것을 평탄화하는 기능을 제공한다. 즉, request로 요청받은 reactive 타입에서 map을 사용한 뒤 return을 reactive 타입으로 진행하게 된다면 Mono<Mono<>> 형식이 되기 때문에 체인에서 reactive 타입을 리턴하려면 flatMap으로 평탄화를 진행해줘야 한다. 이와 더불어 map은 동기로 동작하고 flatMap은 비동기로 동작한다. 참고에서 확인할 수 있다.

함수형

함수형 엔드포인트에서는 들어오는 요청을 라우팅하고, 라우팅된 요청을 처리하며 결과 값을 응답으로 리턴하는 등의 모든 작업을 하나의 함수 체인에서 처리한다.

HandlerFunction을 사용한 request 처리

함수형 엔드포인트는 들어오는 요청을 처리하기 위해 HandlerFunction이라는 함수형 기반의 핸들러를 사용한다.

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
	Mono<T> handle(ServerRequest request);
}

서블릿 기반의 요청 처리는 Servlet 인터페이스의 service 메서드의 파라미터로 전달받는 ServletRequest, ServletResponse에 의해 이뤄진다. 반면에 HandlerFunction은 요청 처리를 위한 ServerRequest 하나만 handle 메서드의 파라미터로 전달받으며 요청 처리에 대한 응답은 Mono 형태로 리턴된다.

  • ServerRequest
    • handlerFunction에 의해 처리되는 HTTP request를 표현
    • 객체를 통해 HTTP headers, method, URI, query parameters에 접근할 수 있는 메서드를 제공
    • HTTP body 정보에 접근하기 위해 body, bodyToMono, bodyToFlux같은 메서드 제공
  • ServerResponse
    • HandlerFUnction, HandlerFilterFunction에 리턴되는 HTTP response를 표현
    • ServerResponse는 BodyBuilder와 HeadersBuilder를 통해 HTTP response body와 header 정보 추가 가능

request 라우팅을 위한 RouterFunction

RouterFunction은 들어오는 요청을 해당 HandlerFunction으로 라우팅해주는 역할을 한다. mvc의 @RequestMapping과 동일한 기능을 한다고 보면 된다. 이는 요청을 위한 데이터 뿐만 아니라 요청 처리를 위한 동작(HandlerFunction)까지 RouterFunction의 파라미터로 제공한다는 차이가 있다.

@FunctionalInterface
public interface RouterFunction<T extends ServerResponse> {
	Mono<HandlerFunction<T>> route(ServerRequest request);
    // ...
}

RouterFunction은 함수형 인터페이스로 route 메서드 하나만 정의되어 있다. 메서드는 파라미터로 전달받은 request에 매치되는 handlerFunction을 리턴한다.

쉽게 생각하면 mvc의 @RequestMapping = webflux router, mvc controller = webflux handler가 되는 것이다.

@Configuration
public class BookRouter {
    // webflux router == mvc @requestMapping
    @Bean
    public RouterFunction<?> routeBookV1(BookHandler handler) {
        return route()
                .POST("/v1/books", handler::createBook)
                .PATCH("/v1/books/{book-id}", handler::updateBook)
                .GET("/v1/books", handler::getBooks)
                .GET("/v1/books/{book-id}", handler::getBook)
                .build();
    }
}
// webflux Handler == mvc controller
@Component
public class BookHandler {
    private final BookMapper mapper;

    public BookHandler(BookMapper mapper) {
        this.mapper = mapper;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        // request body Mono로 변환
        return request.bodyToMono(BookDto.Post.class)
                .map(post -> mapper.bookPostToBook(post))
                .flatMap(book ->
                        // serverResponse 객체 리턴
                        ServerResponse
                                // 헤더에 url 포함 -> created는 상태코드
                                .created(URI.create("/v1/books/" + book.getBookId()))
                                .build());
    }

    public Mono<ServerResponse> getBook(ServerRequest request) {
        // pathVariable 추출
        long bookId = Long.valueOf(request.pathVariable("book-id"));
        Book book =
                new Book(bookId,
                        "Java 고급",
                        "Advanced Java",
                        "Kevin",
                        "111-11-1111-111-1",
                        "Java 중급 프로그래밍 마스터",
                        "2022-03-22",
                        LocalDateTime.now(),
                        LocalDateTime.now());
        return ServerResponse
                            .ok()
                            // body는 reactive 타입으로 감싸지 않아도 된다. serverResponse가 reactive type이기 때문
                            .bodyValue(mapper.bookToResponse(book))
                            // bodyValue가 비어있으면 교체
                            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> updateBook(ServerRequest request) {
        final long bookId = Long.valueOf(request.pathVariable("book-id"));
        return request
                .bodyToMono(BookDto.Patch.class)
                .map(patch -> {
                    patch.setBookId(bookId);
                    return mapper.bookPatchToBook(patch);
                })
                .flatMap(book -> ServerResponse.ok()
                        .bodyValue(mapper.bookToResponse(book)));
    }

    public Mono<ServerResponse> getBooks(ServerRequest request) {
        List<Book> books = List.of(
                new Book(1L,
                        "Java 고급",
                        "Advanced Java",
                        "Kevin",
                        "111-11-1111-111-1",
                        "Java 중급 프로그래밍 마스터",
                        "2022-03-22",
                        LocalDateTime.now(),
                        LocalDateTime.now()),
                new Book(2L,
                        "Kotlin 고급",
                        "Advanced Kotlin",
                        "Kevin",
                        "222-22-2222-222-2",
                        "Kotlin 중급 프로그래밍 마스터",
                        "2022-05-22",
                        LocalDateTime.now(),
                        LocalDateTime.now())
        );
        return ServerResponse
                .ok()
                .bodyValue(mapper.booksToResponse(books));
    }
}

request body 검증

함수형 엔드포인트에서는 spring의 validator 인터페이스를 구현해서 request body의 유효성 검증을 진행할 수 있다.

@Component
public class BookValidator implements Validator {
    @Override
    public boolean supports(Class<?> clazz) {
        return BookDto.Post.class.isAssignableFrom(clazz);
    }

    @Override
    public void validate(Object target, Errors errors) {
        BookDto.Post post = (BookDto.Post) target;

        // validationUtils는 spring validation 패키지에서 제공된다.
        ValidationUtils.rejectIfEmptyOrWhitespace(
                errors, "titleKorean", "field.required");

        ValidationUtils.rejectIfEmptyOrWhitespace(
                errors, "titleEnglish", "field.required");
    }
}
@Slf4j
@Component
public class BookHandler {
    private final BookMapper mapper;
    private final BookValidator validator;

    public BookHandler(BookMapper mapper, BookValidator validator) {
        this.mapper = mapper;
        this.validator = validator;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        return request.bodyToMono(BookDto.Post.class)
                // 인자 검증
                .doOnNext(post -> this.validate(post))
                .map(post -> mapper.bookPostToBook(post))
                .flatMap(book ->
                        ServerResponse
                                .created(URI.create("/v2/books/" + book.getBookId()))
                                .build());
    }
}

위 반식은 custom Validator를 이용한 유효성 검증 방식이다.

간단한 validation 같은 경우라면 다른 핸들러가 추가되었을 때, 비슷한 validator가 또 생겨야 하는 문제가 있기 때문에 표준 bean validation을 사용할 수도 있다.

// bean validation 사용
public class BookDto {
    @Getter
    public static class Post {
        @NotBlank
        private String titleKorean;

        @NotBlank
        private String titleEnglish;

        @NotBlank
        private String description;

        @NotBlank
        private String author;

        @NotBlank
        private String isbn;

        @NotBlank
        private String publishDate;
    }
}
@Configuration
public class BookRouter {

    // ...

    // spring에서 제공하는 표준 beanvalidation 등록
    @Bean
    public Validator springValidator() {
        return new LocalValidatorFactoryBean();
    }
}
// 모든 타입을 검증할 수 있도록 제네릭 사용
@Slf4j
@Component
public class RequestValidator<T> {
    private final Validator validator;

    public RequestValidator(Validator validator) {
        // spring validator 주입
        this.validator = validator;
    }

    public void validate(T body) {
        Errors errors =
                new BeanPropertyBindingResult(body, body.getClass().getName());

        // annotation에 의한 validate
        this.validator.validate(body, errors);

        if (!errors.getAllErrors().isEmpty()) {
            onValidationErrors(errors);
        }
    }

    private void onValidationErrors(Errors errors) {
        log.error(errors.getAllErrors().toString());
        throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors()
                .toString());
    }
}
@Component
public class BookHandler {
    private final BookMapper mapper;
    private final RequestValidator validator;

    public BookHandler(BookMapper mapper, RequestValidator validator) {
        this.mapper = mapper;
        this.validator = validator;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        return request.bodyToMono(BookDto.Post.class)
                // validate 
                .doOnNext(post -> validator.validate(post))
                .map(post -> mapper.bookPostToBook(post))
                .flatMap(book -> ServerResponse
                        .created(URI.create("/v3/books/" + book.getBookId()))
                        .build());
    }
}

Spring Data R2DBC

R2DBC는 관계형 데이터베이스에 리액티브 프로그래밍 API를 제공하기 위한 개방형 사양이면서 드라이브 벤더가 구현하고 클라이언트가 사용하기 위한 SPI이다.

과거에는 리액티브 애플리케이션에서 관계형 데이터베이스를 사용할 경우, JDBC API 자체가 블로킹이기 때문에 완전한 논블로킹을 지원하는 것은 불가능했다. 하지만 R2DBC의 등장으로 관계형 DB도 완전한 논블로킹 구성이 가능해졌다.
Spring Data R2DBC는 R2DBC 기반의 Repository를 좀 더 쉽게 구현해주는 프로젝트다. JPA 같은 ORM 프레임워크에서 제공하는 캐싱, 지연 로딩, 기타 ORM 프레임워크에서 가지고 있는 특징들이 제거되어 단순하고 심플한 방법으로 사용할 수 있다.

설정

지원하는 driver는 공식 홈페이지에서 확인할 수 있다.
추가 참고 azure

implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'com.github.jasync-sql:jasync-mysql:2.1.16'

r2dbc는 jpa처럼 스키마를 자동 생성해주는 기능이 없기 때문에 직접 테이블 스크립트를 작성해야 한다.

@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
public class R2dbcConfig {
}

r2dbc용 repository와 auditing을 활성화해준다.

도메인 매핑

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class Book {
    @Id
    private long bookId;
    private String titleKorean;
    private String titleEnglish;
    private String description;
    private String author;
    private String isbn;
    private String publishDate;

    @CreatedDate
    private LocalDateTime createdAt;

    @LastModifiedDate
    @Column("last_modified_at")
    private LocalDateTime modifiedAt;
}

repository

jpa와 마찬가지로 추상화된 데이터 액세스 기술 repository를 지원한다.

public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
    Mono<Book> findByIsbn(String isbn);
}

ReactiveCrudRepository를 상속한다는 것과 반환 타입이 Mono 또는 Flux 타입이라는 것만 다르다.

service

@Slf4j
@Service
@RequiredArgsConstructor
public class BookService {
    private final @NonNull BookRepository bookRepository;
    private final @NonNull CustomBeanUtils<Book> beanUtils;

    public Mono<Book> saveBook(Book book) {
        return verifyExistIsbn(book.getIsbn())
                .then(bookRepository.save(book));
    }

    public Mono<Book> updateBook(Book book) {
        return findVerifiedBook(book.getBookId())
                .map(findBook -> beanUtils.copyNonNullProperties(book, findBook))
                .flatMap(updatingBook -> bookRepository.save(updatingBook));
    }

    public Mono<Book> findBook(long bookId) {
        return findVerifiedBook(bookId);
    }

    public Mono<List<Book>> findBooks() {
        return bookRepository.findAll().collectList();
    }

    private Mono<Void> verifyExistIsbn(String isbn) {
        return bookRepository.findByIsbn(isbn)
                .flatMap(findBook -> {
                    if (findBook != null) {
                        return Mono.error(new BusinessLogicException(
                                                    ExceptionCode.BOOK_EXISTS));
                    }
                    // 실제로 호출될일은 없음. -> return 값이 필요해 명시
                    return Mono.empty();
                });
    }

    private Mono<Book> findVerifiedBook(long bookId) {
        return bookRepository
                .findById(bookId)
                .switchIfEmpty(Mono.error(new BusinessLogicException(
                                                    ExceptionCode.BOOK_NOT_FOUND)));
    }
}
  • saveBook
    • verifyExistIsbn 를 통해 이미 등록된 isbn인지 확인한다. 존재한다면 Mono.error을 이용해서 Exception을 throw한다.
    • verifyExistIsbn 메서드에 보면 finidByIsbn을 호출하는데 여기서 찾지 못하면 Mono.empty를 반환하게 된다. Mono.empty의 경우 flatMap 체인을 붙여도 flatMap이 실행되지 않는다. 즉, 바로 Mono.empty를 반환한다고 보면 된다.
    • then operator는 현재 진행되는 Mono를 종료하고 파라미터로 입력한 새로운 Mono sequence를 시작한다. 여기서는 save 리턴값으로 받은 새로운 mono book sequence를 시작하게 된다.
  • updateBook
    • findVerifiedBook 메서드로 도서가 조회하는지 확인하고 없으면 Mono.error을 이용해 Exception을 throw한다.
  • findBooks
    • Flux로 리턴되는 반환값을 collectList로 하나의 Mono 형태의 리스트로 만들어 반환한다.

handler

@Slf4j
@Component("BookHandlerV5")
public class BookHandler {
    private final BookMapper mapper;
    private final BookValidator validator;
    private final BookService bookService;

    public BookHandler(BookMapper mapper, BookValidator validator, BookService bookService) {
        this.mapper = mapper;
        this.validator = validator;
        this.bookService = bookService;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        return request.bodyToMono(BookDto.Post.class)
                .doOnNext(post -> validator.validate(post))
                .flatMap(post -> bookService.saveBook(mapper.bookPostToBook(post)))
                .flatMap(book -> ServerResponse
                        .created(URI.create("/v5/books/" + book.getBookId()))
                        .build());
    }

    public Mono<ServerResponse> updateBook(ServerRequest request) {
        final long bookId = Long.valueOf(request.pathVariable("book-id"));
        return request
                .bodyToMono(BookDto.Patch.class)
                .doOnNext(patch -> validator.validate(patch))
                .flatMap(patch -> {
                    patch.setBookId(bookId);
                    return bookService.updateBook(mapper.bookPatchToBook(patch));
                })
                .flatMap(book -> ServerResponse.ok()
                                        .bodyValue(mapper.bookToResponse(book)));
    }

    public Mono<ServerResponse> getBook(ServerRequest request) {
        long bookId = Long.valueOf(request.pathVariable("book-id"));

        return bookService.findBook(bookId)
                        .flatMap(book -> ServerResponse
                                .ok()
                                .bodyValue(mapper.bookToResponse(book)));
    }

    public Mono<ServerResponse> getBooks(ServerRequest request) {
        return bookService.findBooks()
                .flatMap(books -> ServerResponse
                        .ok()
                        .bodyValue(mapper.booksToResponse(books)));
    }
}

ServerResponse의 체이닝은 Mono로 감싼 ServerResponse를 반환하기 때문에 평탄화를 위해서 flatMap을 사용해야 한다.

R2dbcEntityTemplate을 이용한 데이터 엑세스

repository를 사용해 db 접근하는 것은 널리 알려진 방식이다. 그런데 r2dbc는 repository를 사용한 데이터 엑세스 뿐만 아니라 가독성 좋은 SQL 쿼리문을 작성하는 것과 같은 자연스러운 방식으로 메서드를 조합하여 데이터베이스와 인터렉션할 수 있는 R2dbcEntityTemplate을 제공한다.

repository를 사용한 service에서 R2dbcEntityTemplate을 사용한 service로 수정해보자.

@Slf4j
@Service
@RequiredArgsConstructor
public class BookService {
    private final R2dbcEntityTemplate template;
    private final CustomBeanUtils<Book> beanUtils;

    public Mono<Book> saveBook(Book book) {
        return verifyExistIsbn(book.getIsbn())
                .then(template.insert(book));
    }

    public Mono<Book> updateBook(Book book) {
        return findVerifiedBook(book.getBookId())
                .map(findBook -> beanUtils.copyNonNullProperties(book, findBook))
                // update 메서드는 수정 ㅈ어보를 포함하는 엔티티 클래스 객체를 파라미터로 갖는다.
                .flatMap(updatingBook -> template.update(updatingBook));
    }

    public Mono<Book> findBook(long bookId) {
        return findVerifiedBook(bookId);
    }

    public Mono<List<Book>> findBooks() {
        // select와 all을 이용해 테이블에 저장된 모든 도서 정보를 조회한다.
        // select 메서드는 from, as, one, all 등의 종료 메서드와 함께 사용된다.
        return template.select(Book.class).all().collectList();
    }

    private Mono<Void> verifyExistIsbn(String isbn) {
        return template.selectOne(query(where("ISBN").is(isbn)), Book.class)
                .flatMap(findBook -> {
                    if (findBook != null) {
                        return Mono.error(new BusinessLogicException(
                                ExceptionCode.BOOK_EXISTS));
                    }
                    return Mono.empty();
                });
    }

    private Mono<Book> findVerifiedBook(long bookId) {
        // selectOne는 한 건의 데이터를 조회하는데 사용된다.
        // query 객체(criteria 포함)와 엔티티 클래스 객체를 파라미터로 갖는다.
        // is는 equal를 표현한다.
        return template.selectOne(query(where("BOOK_ID").is(bookId)), Book.class)
                .switchIfEmpty(Mono.error(new BusinessLogicException(
                                                ExceptionCode.BOOK_NOT_FOUND)));
    }
}

R2dbcEntityTemplate는 SQL 쿼리문의 시작 구문인 SELECT, INSERT, UPDATE, DELETE 등에 해당하는 select(), insert(), update(), delete() 메서드를 Entrypoint method라고 부르며, all(), count(), one() 등의 메서드처럼 SQL문을 생성하고 최종적으로 SQL문을 실행하는 메서드를 terminating method라고 부른다.

terminating method

select 메서드와 함께 사용할 수 있는 terminating methos는 다음과 같다.

  • first()
    • 조건에 일치하는 result row 중에서 first row를 얻고자 할 경우 사용할 수 있다. 만약 조건에 일치하는 row가 없다면 Mono를 리턴한다.
  • one()
    • 조건에 일치하는 result row가 단 하나일 경우 사용할 수 있다.
    • 만약 조건에 일치하는 row가 없다면 Mono를 리턴하며, 한건보다 많다면 exception이 발생한다.
  • all()
    • 조건에 일치하는 모든 result row를 얻고자 사용한다.
  • count()
    • 조건에 일치하는 데이터의 건수만 조회할 경우 사용한다.
    • 반환 타입은 Mono
  • exists()
    • 조건에 일치하는 result row가 존재하는지 여부를 확인한다.
    • 리턴 타입은 Mono

criteria method

제공되는 메서드 모두 criteria를 추가한 새로운 criteria를 리턴한다.

  • and(String column)
  • or(String column)
  • greaterThan(Object o)
  • GraterThanOrEquals(Object o)
  • in(Object... o)
  • in(Collection<?> collection)
  • is(Object o)
    • =에 해당
  • isNull()
  • isNotNull()
  • lessThan(Object o)
  • lessThanOrEquals(Object o)
  • like(Object o)
  • not(Object o)
  • notIn(Object... o)
  • notIn(Collection<?> collection)

페이지네이션

repository

@Repository
public interface BookRepository extends ReactiveCrudRepository<Book, Long> {
    Flux<Book> findAllBy(Pageable pageable);
}
@Service("bookServiceV7")
@RequiredArgsConstructor
public class BookService {
    private final @NonNull BookRepository bookRepository;
    private final @NonNull CustomBeanUtils<Book> beanUtils;

    public Mono<List<Book>> findBooks(@Positive int page,
                                      @Positive int size) {
        return bookRepository
                .findAllBy(PageRequest.of(page - 1, size,
                                                    Sort.by("memberId").descending()))
                .collectList();
    }

}

jpa를 사용할 때와 다르지 않다. 여기서는 간단히 Mono 형태로 반환했지만 클라이언트에서 페이지네이션 정보가 필요하다면 pageImpl 객체를 리턴하도록 하면 도니다.

R2dbcEntityTemplate

limit(), offset(), sort()등의 쿼리 빌드 메서드를 조합하여 적용할 수 있다.
아래에서는 제공되는 쿼리 빌드 메서드를 사용하지 않고 operator 체인을 이용해서 직접 페이지네이션을 적용한 코드다.

@Slf4j
@Validated
@Service
@RequiredArgsConstructor
public class BookService {
    private final @NonNull R2dbcEntityTemplate template;
    private final @NonNull CustomBeanUtils<Book> beanUtils;

    public Mono<List<Book>> findBooks(@Positive long page, @Positive long size) {

        return template
                .select(Book.class)
                .count() // count로 총 도서 개수 추출
                .flatMap(total -> {
                    Tuple2<Long, Long> skipAndTake = getSkipAndTake(total, page, size);
                    return template
                            .select(Book.class)
                            .all()
                            .skip(skipAndTake.getT1())
                            .take(skipAndTake.getT2())
                            .collectSortedList((Book b1, Book b2) ->
                                    (int) (b2.getBookId() - b1.getBookId()));
                });
    }

    private Tuple2<Long, Long> getSkipAndTake(long total, long movePage, long size) {
        long totalPages = (long) Math.ceil((double) total / size);
        long page = movePage > totalPages ? totalPages : movePage;
        // 파라미터로 전달받은 페이지의 시작 지점으로 이행하기 위해 그 지점만큼 skip
        long skip = total - (page * size) < 0 ? 0 : total - (page * size);
        // 데이터 개수
        long take = total - (page * size) < 0 ? total - ((page - 1) * size) : size;

        return Tuples.of(skip, take);
    }
}

매핑관계

https://binux.tistory.com/156
https://www.sipios.com/blog-tech/handle-the-new-r2dbc-specification-in-java
https://anomie7.tistory.com/95

위 링크를 참고하자

  • bufferUntilChanged
    • 일대다의 경우 데이터가 뻥튀기 되서 나오는데 이를 사용하면 해당 값이 일치하는 것들을 Flux로 묶어준다.
    • 원래는 fetch all을 사용하면 Flux<(mutable)Map<String!, Any!>!> 형태의 타입으로 반환된다. 여기서 bufferUntilChanged() 사용해서 특정 컬럼으로 묶어주면, Flux<(mutable)List<(mutable)Map<String!, Any!>!>!> 형태가 된다. 즉, 특정 컬럼 값이 일치하는 row들끼리 하나의 list로 묶어준다는 의미다.
  • oneToMany 관계에서는 sort도 반드시 필요하다 위에 언급한 첫번째 링크를 참고하자.

reactiveCRUDRepository와 R2dbcEntityTemplate를 사용하는 것은 연관관계가 없는 경우 사용에 적합하다. 연관관계 매핑이 들어가는 경우 databaseclient를 사용해서 쿼리를 직접 작성해야 한다.

databaseclient

databaseclient는 보통 다음과 같은 api들을 사용하여 체이닝한다.

  • sql
    • sql문을 입력받는다.
  • bind
    • sql문에 필요한 인자를 바인딩한다.
  • fetch
    • 쿼리 결과를 가져오기 위한 단계로 쿼리가 실행된 후에 호출되며 다음 단계는 어떤 결과를 반환할지 결정한다.
  • all
    • fetch 이후 호출 가능한 것중 하나로 쿼리의 모든 행을 반환한다.
    • flux 타입으로 반환 , asflow로 코루틴 변경 가능
  • one
    • fetch 이후 호출 가능한 것중 하나로 쿼리 결과의 하나의 행만 반환한다.
    • 만약 두 개의 행이 존재하면 IncorrectResultSizeDataAccessException 발생
  • then
    • 쿼리가 성공적으로 완료된 후에만 시그널을 보내는 Mono를 반환한다.
    • 보통 업데이트와 같은 상황에서 결과로 생성된 행이 없는 경우에 유용하다.

예외 처리

mvc 기반의 @ExceptionHandler나 @controllerAdvice 등은 webflux에서도 사용할 수 있다. 이외의 webflux 전용 예외 처리 기법을 살펴보자.(개인적으로는 controllerAdvice를 사용하는 것이 나아보인다.)

onErrorResume

onErrorResume는 에러 이벤트가 발생했을 때 에러 이벤트를 downstream으로 전파하지 않고, 대체 publisher를 통해 에러 이벤트에 대한 대체 값을 emit하거나 발생한 에러 이벤트를 래핑한 후에 다시 에러 이벤트를 발생시키는 역할을 한다.

@Component("BookHandlerV9")
public class BookHandler {
    private final BookMapper mapper;
    private final BookValidator validator;
    private final BookService bookService;

    public BookHandler(BookMapper mapper, BookValidator validator, BookService bookService) {
        this.mapper = mapper;
        this.validator = validator;
        this.bookService = bookService;
    }

    public Mono<ServerResponse> createBook(ServerRequest request) {
        return request.bodyToMono(BookDto.Post.class)
                .doOnNext(post -> validator.validate(post))
                .flatMap(post -> bookService.createBook(mapper.bookPostToBook(post)))
                .flatMap(book -> ServerResponse
                        .created(URI.create("/v9/books/" + book.getBookId()))
                        .build())
                // 파라미터(처리할 exception, 대체할 publisher sequence)
                .onErrorResume(BusinessLogicException.class, error -> ServerResponse
                            .badRequest()
                            .bodyValue(new ErrorResponse(HttpStatus.BAD_REQUEST,
                                                            error.getMessage())))
                .onErrorResume(Exception.class, error ->
                        ServerResponse
                                .unprocessableEntity()
                                .bodyValue(
                                    new ErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR,
                                                        error.getMessage())));
    }

    public Mono<ServerResponse> updateBook(ServerRequest request) {
        final long bookId = Long.valueOf(request.pathVariable("book-id"));
        return request
                .bodyToMono(BookDto.Patch.class)
                .doOnNext(patch -> validator.validate(patch))
                .flatMap(patch -> {
                    patch.setBookId(bookId);
                    return bookService.updateBook(mapper.bookPatchToBook(patch));
                })
                .flatMap(book -> ServerResponse.ok()
                                        .bodyValue(mapper.bookToResponse(book)))
                // 에러 타입에 관계없이 처리
                .onErrorResume(error -> ServerResponse
                        .badRequest()
                        .bodyValue(new ErrorResponse(HttpStatus.BAD_REQUEST,
                                error.getMessage())));
    }
}

ErrorWebExceptionHandler 이용한 글로벌 예외 처리

클래스 내 여러 개의 sequence에 대해 일일이 onErrorResume를 할 순 없다. 이를 보환하기 위해 GlobalExceptionHandler를 추가로 작성할 수 있다.

@Getter
@AllArgsConstructor
public class ErrorResponse {
    private HttpStatus status;
    private String message;
}

// ErrorWebFluxAutoConfiguration을 통해 등록된 DafaultErrorWebExceptionHandler 빈의 우선순위보다 높은 순위인 -2로 지정
@Order(-2)
@Configuration
// ErrorWebExceptionHandler를 구현하여 GlobalExceptionHandler로 동작
public class GlobalWebExceptionHandler implements ErrorWebExceptionHandler {
    private final ObjectMapper objectMapper;

    public GlobalWebExceptionHandler(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    // 첫번째 파라미터로 response를 설정할 수 있고 두 번째 파라미터로 발생한 예외를 처리한다.
    @Override
    public Mono<Void> handle(ServerWebExchange serverWebExchange,
                             Throwable throwable) {
        return handleException(serverWebExchange, throwable);
    }

    private Mono<Void> handleException(ServerWebExchange serverWebExchange,
                                       Throwable throwable) {
        ErrorResponse errorResponse = null;
        DataBuffer dataBuffer = null;

        // bufferFactory 인터페이스의 구현체 생성, bufferFactory는 DataBuffer를 위한 팩토리로서 response body를 write하는데 사용
        DataBufferFactory bufferFactory =
                                serverWebExchange.getResponse().bufferFactory();            
        serverWebExchange.getResponse().getHeaders()
                                        .setContentType(MediaType.APPLICATION_JSON);

        if (throwable instanceof BusinessLogicException) {
            BusinessLogicException ex = (BusinessLogicException) throwable;
            ExceptionCode exceptionCode = ex.getExceptionCode();
            errorResponse = ErrorResponse.of(exceptionCode.getStatus(),
                                                exceptionCode.getMessage());
            serverWebExchange.getResponse()
                        .setStatusCode(HttpStatus.valueOf(exceptionCode.getStatus()));
        } else if (throwable instanceof ResponseStatusException) {
            ResponseStatusException ex = (ResponseStatusException) throwable;
            errorResponse = ErrorResponse.of(ex.getStatus().value(), ex.getMessage());
            serverWebExchange.getResponse().setStatusCode(ex.getStatus());
        } else {
            errorResponse = ErrorResponse.of(HttpStatus.INTERNAL_SERVER_ERROR.value(),
                                                            throwable.getMessage());
            serverWebExchange.getResponse()
                                    .setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        }

        try {
            // DataBuffer로 래핑하여 response body를 구성            
            dataBuffer =
                    bufferFactory.wrap(objectMapper.writeValueAsBytes(errorResponse));
        } catch (JsonProcessingException e) {
            bufferFactory.wrap("".getBytes());
        }
        // response body write
        return serverWebExchange.getResponse().writeWith(Mono.just(dataBuffer));
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions