WebFlux의 기본 개념 (Event Loop, Lazy Evaluation, publish-subscribe)
이 글은 Netty + WebFlux가 작동하는 방식에 대한 기본 개념들을 설명합니다.
Netty + WebFlux의 특징을 보면 적은 리소스로 많은 사용자의 동시 요청을 받을 수 있고,
I/O 작업이 많은 경우 더 효율적입니다.
Spring WebFlux는 "비동기", "논블로킹", "리액티브 스트림" 이라는 용어로 소개되지만, 처음 접할때는 다소 복잡하게 느껴질 수 있습니다.
그런데 WebFlux의 구조는 아래 개념만 알고있으면 사실 크게 복잡하지 않습니다.
- 이벤트 루프
- 지연 평가
- 구독-발행
이벤트 루프
이벤트 루프는 하나의 스레드가 반복적으로 이벤트를 감지하고, 이를 순차적으로 처리하는 실행 모델입니다
Java 개발자가 이해하기 쉽도록 코드로 표현하면 아래 코드와 같습니다.
void eventWorker() {
while (true) {
Event event = EventQueue.take(); //blocking (no busy wait)
event.evaluate();
}
}
EventQueue에 이벤트가 들어오면 worker가 이벤트를 평가하고 다시 올라가서 이벤트가 들어오기를 기다립니다.
여기서 worker는 이벤트가 도착하기 전까지는 대기하고, 이벤트가 발생하면 빠르게 처리하기 때문에 높은 효율을 가집니다.
물론 이벤트 루프는 상태를 저장하지 않기 때문에, 상태 관리가 필요한 경우 외부 저장소나 컨텍스트를 이용해야 합니다.
WebFlux는 Netty 위에서 동작하며, Netty는 이벤트 루프 기반의 네트워크 엔진입니다.
Netty는 내부적으로 이벤트 루프 를 병렬로 실행시키면서 요청을 처리합니다.
각 클라이언트 연결(Channel)은 특정 EventLoop에 바인딩되고, 해당 이벤트 루프가 그 채널의 모든 I/O를 처리합니다
그러기 위해서 Channel + Selector 기반의 NIO 구조를 사용하는데 이는 os에 묶여있는 network들을 처리하는 방식에 대한 것으로, 여기서 중요한 개념은 아니기에 추후에 다시 다루겠습니다.
지연 평가
Java 표준 라이브러리 중 FunctionalInterface로 사용하는 코드를 예로 들어보겠습니다.
Function<Integer, String> event = value -> String.valueOf(value);
event.apply(1); // evaluation
여기서 value -> String.valueOf(value) 라고 Integer값을 받아서 String으로 변환하는 코드는 첫번째 줄에서는 작동하지 않습니다.
event.apply() 라는 evauation 코드를 만나야 작동하게 됩니다.
물론 초기 값을 주지 않았기 때문에 당연할 수 있습니다.
자 그러면 코드를 좀 더 확장해서 Stream API를 사용해보겠습니다.
1| Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5)
2| .map(i -> i * 3)
3| .filter(i -> i % 2 == 0);
4|
5| List<Integer> list = stream.toList(); <-- terminal operator
코드가 실행되면 세번째 줄까지 아무런 작동을 하지 않고 terminal operation을 만났을 때 map과 filter 연산이 시작 되게 됩니다.
이것을 lazy evaluation이라고 하고 이게 publish-subscribe의 구조를 만들고 webflux를 이해하는데 가장 중요한 개념입니다.
다시 위 코드를 보면 세번째 줄까지가 'pipeline(publish)을 만들었다' 라고 볼 수 있고 5번째 줄은 'evaluation(subscribe) 했다' 로 볼 수 있습니다.
지연평가 코드는 대부분 람다식으로 작성하고, 때로는 클래스/익명 클래스로 작성하기도 합니다.
구독 발행
그럼 Webflux의 구문을 한번 보겠습니다.
@Test
void test() {
Mono<Integer> publisher = Mono.defer(() -> Mono.just(2))
.map(i -> i * 3)
.filter(i -> i % 2 == 0);
publisher.subscribe(System.out::println);
}
위에서 본 지연 평가 구조가 익숙해지면 대충 어떤 흐름인지 보이게 됩니다.
() -> Mono.just(2) 와 map, fileter가 pipeline이고,
subscribe() 가 terminal operation이 됩니다.
위 지연평가와 같이 map 과 filter는 동작하지 않고 있다가 subscribe를 만나는 시점에 위 코드(pipeline) 이 동작하게 되는거죠.
그럼 이제 실제로 사용할 수 있는 controller를 만들어보겠습니다.
@RestController
public class MonoTestController {
@GetMapping("/test")
public Mono<Integer> test() {
return Mono.defer(() -> Mono.just(2))
.map(i -> i * 3)
.filter(i -> i % 2 == 0);
}
}
만든 컨트롤러로 요청을 보내보면,
예상했던 6을 받게됩니다.
그런데 여기서 이상한점이 있죠..
Controller 를 보게되면 subscribe 코드가 없는데 값이 연산되어 요청에 대한 응답으로 처리가 되었습니다.
이것은 WebFlux가 Netty 위에서 subscribe를 수행했기 때문입니다.
그럼 이제 데이터 타입인 Mono와 Flux가 무엇인지 간단하게 보겠습니다.
Mono와 Flux
WebFlux에서 요청을 처리할 때 사용하는 데이터 타입입니다.
Mono와 Flux의 차이는 아래와 같습니다.
"Mono는 데이터를 한번 주고, Flux는 데이터를 여러번 준다."
(여기서 데이터는 Collection일수 있고 크고 작거나 여러개일 수 있습니다.)
Client입장에서 Mono는 우리가 흔히 접하는 HttpProtocol을 생각하면 됩니다.
검색창에 www.naver.com 을 검색하면 네이버 화면이 옵니다. 1번의 요청에 1번의 데이터가 들어왔다고 하겠습니다.
그럼 이제 한번의 요청에 값을 여러번 주는 것을 예시로 들겠습니다.
예를 들어서 naver 개발자가 웹 페이지를 '상단', '중단', '하단' 으로 3개의 영역으로 나눴다고 합니다.
각각을 데이터로 만들고 3번에 걸쳐 사용자한테 제공합니다.
그럼 사용자는 요청을 보내고, 그 응답으로 '상단' 을 받고 렌더링 하고, '중단'을 받고 렌더링 하고, '하단'을 받고 렌더링을 해서 하나의 페이지를 만들 수 있습니다. (서버에서도 순서대로 만들어서 보냅니다.)
이것을 Flux하고 하며, 기존에 사용하던 전통적인 Http Protocol과는 살짝 다른 느낌이 드실겁니다.
사용자가 HTTP 요청을 보내면 서버는 응답을 줄 때 connection 을 열어놓고, HTTP 응답을 보내주면서 connection을 close하지 않습니다.
'상단'을 body에 넣고 보냅니다. connection을 close하지 않습니다.
'중단'을 body에 넣고 보냅니다. connection을 close하지 않습니다.
'하단'을 body에 넣고 보냅니다. connection을 close합니다.
이렇게 1개의 요청에 여러번 응답을 (chunk 단위로) 밀어넣게 하는 것이 Flux입니다.
그래서 코드가 아래와 같이 작성됩니다.
@GetMapping("/flux")
public Flux<String> fluxTest() {
return Flux.create(sink -> {
timeSleep(1000); // '상단' 데이터 생성 시간
sink.next("상단"); // <-- '상단'
timeSleep(1000); // '중단' 데이터 생성 시간
sink.next("중단"); // <-- '중단'
timeSleep(1000); // '하단' 데이터 생성 시간
sink.next("하단"); // <-- '하단'
sink.complete(); // <-- connection close signal
});
}
Flux.create는 무시하셔도 무방하며, Flux의 구조를 파악하기 위해 동기 + blocking으로 처리했습니다.
sink를 보면 메서드를 2개 사용했는데 next 와 complete 입니다.
- sink.next() : 데이터를 전송합니다. 청크 단위로 이루어집니다.
- sink.complete() : 데이터들을 모두 보내주었다고 알려주는 코드로 해당 코드가 없으면 connection이 종료되지 않습니다.
Server-Sent Events(SSE) 방식일 땐 postman 요청을 보낼때 꼭 "Accept: text/event-stream" 헤더를 추가해야합니다.
응답을 next한 시점에 데이터를 1초마다 받는것을 확인하실 수 있습니다.