| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
- JPA
- DevOps
- 트랜잭션
- Kotlin
- 백엔드개발
- monitoring
- 성능 최적화
- 동시성제어
- Kubernetes
- RDBMS
- 성능최적화
- 백엔드
- NIO
- kafka
- SpringBoot
- Java
- redis
- prometheus
- CloudNative
- docker
- GitOps
- webflux
- netty
- selector
- grafana
- 데이터베이스
- mysql
- spring boot
- helm
- jvm
- Today
- Total
유성
[Java/NIO] Selector와 Pipe로 구현한 WebFlux 본문
Spring WebFlux를 사용하다 보면 flatMap이 어떻게 쓰레드를 차단하지 않고 데이터를 흘려보내는지 궁금할 때가 있다.
흔히 "이벤트 루프 기반이다", "논블로킹이다"라고 말하지만, 이를 로우 레벨에서 직접 구현해 보면 그 본질을 더 명확히 이해할 수 있다.
이번 글에서는 Java NIO의 Selector와 Pipe를 이용해, WebFlux의 Assembly-Subscription 구조를 간단하게 직접 코드로 구현해본다.
1. 핵심 개념: 왜 Selector인가?
WebFlux의 기반인 Netty는 내부적으로 자바의 Selector를 사용한다.
Selector는 하나의 쓰레드가 여러 채널(소켓, 파이프 등)을 감시하게 해준다.
- Assembly (조립): 파이프라인을 구성하는 단계 (flatMap 호출 시점)
- Subscription (구독): 실제 데이터가 흘러가기 시작하는 단계 (subscribe 호출 시점)
- Event Loop: Selector가 이벤트를 감지하고 다음 단계의 함수를 실행하는 과정
2. 코드 구현
채팅엔진을 구현할때 처럼 ServerSocketChannel을 사용하면 좋겠지만 편의성과 간결성을 목적으로 Pipe를 사용해보자.
Pipe는 메모리 내에서 만드는 비동기 통로이며, 각 단계가 끝날 때마다 다음 단계로 데이터를 "전달"하는 구조로 구성을 한다.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.*;
import java.util.function.Function;
public class MyReactiveStream {
private final List<Function<Object, Object>> operators = new ArrayList<>();
private Selector selector;
public static void main(String[] args) throws IOException {
System.out.println("=== 커스텀 리액티브 스트림 조립 시작 ===");
// 1. 스트림 생성 및 파이프라인 조립 (Assembly)
MyReactiveStream stream = MyReactiveStream.create();
stream.flatMap(data -> {
System.out.println("[Step 1] 대문자 변환 처리 중...");
return data.toString().toUpperCase();
}).flatMap(data -> {
System.out.println("[Step 2] 느낌표 추가 처리 중...");
return data.toString() + "!!!";
});
// 2. 구독 시작 (Initial Data 던지기)
// 별도의 쓰레드에서 실행하거나 메인에서 실행 가능
new Thread(() -> {
try {
System.out.println("=== 커스텀 리액티브 스트림 구독 시작 ===");
stream.subscribe("hello world");
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
public static MyReactiveStream create() throws IOException {
MyReactiveStream stream = new MyReactiveStream();
stream.selector = Selector.open();
return stream;
}
@SuppressWarnings("unchecked")
public MyReactiveStream flatMap(Function<?, ?> func) {
operators.add((Function<Object, Object>) func);
return this;
}
public void subscribe(Object initialData) throws IOException {
dispatchNext(initialData, 0);
while (!Thread.currentThread().isInterrupted()) {
// 1000ms 동안 이벤트가 없으면 루프 확인 (예제 종료를 위해)
if (selector.select(1000) == 0) {
// 더 이상 처리할 이벤트가 없으면 종료 로직 가능
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isReadable()) {
processEvent(key);
}
}
}
}
private void processEvent(SelectionKey key) throws IOException {
SourceContext ctx = (SourceContext) key.attachment();
Pipe.SourceChannel source = (Pipe.SourceChannel) key.channel();
// 데이터 읽기
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = source.read(buffer);
if (bytesRead <= 0) return;
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String input = new String(bytes);
source.close(); // 사용한 채널 닫기
// 로직 실행
Object result = operators.get(ctx.index).apply(input);
// 다음 단계 진행
if (ctx.index + 1 < operators.size()) {
dispatchNext(result, ctx.index + 1);
} else {
System.out.println("\n[최종 결과 수신]: " + result);
System.out.println("모든 파이프라인 처리가 완료되었습니다.");
System.exit(0); // 예제 종료
}
}
private void dispatchNext(Object data, int nextIndex) throws IOException {
Pipe pipe = Pipe.open();
Pipe.SinkChannel sink = pipe.sink();
ByteBuffer buf = ByteBuffer.wrap(data.toString().getBytes());
while (buf.hasRemaining()) {
sink.write(buf);
}
sink.close(); // 전송 완료 후 sink 닫기
pipe.source().configureBlocking(false);
pipe.source().register(selector, SelectionKey.OP_READ, new SourceContext(nextIndex));
}
static class SourceContext {
int index;
SourceContext(int index) { this.index = index; }
}
}
// 처리 결과
=== 커스텀 리액티브 스트림 조립 시작 ===
=== 커스텀 리액티브 스트림 구독 시작 ===
[Step 1] 대문자 변환 처리 중...
[Step 2] 느낌표 추가 처리 중...
[최종 결과 수신]: HELLO WORLD!!!
모든 파이프라인 처리가 완료되었습니다.
종료 코드 0(으)로 완료된 프로세스
처리 결과 조립이 먼저 수행되고, 구독이 수행된다.
3. 코드 분석
단순히 데이터가 흐르는 것 이상의 의미가 이 코드에 담겨 있다. WebFlux의 핵심 메커니즘과 대조하며 분석해보자.
1) Attachment를 이용한 상태 유지
비동기 환경에서 가장 어려운 것은 '내가 어디까지 했는가'를 기억하는 것이다.
동기 방식은 스레드 스택(Stack)이 상태를 관리하지만, 비동기는 스레드가 중간에 해제되기 때문에 별도의 저장소가 필요하다.
- 코드의 역할: SelectionKey.attachment()에 SourceContext를 저장한다.
- WebFlux는: Webflux에서는 파이프라인의 각 단계 정보를 Context나 Subscription 객체에 담아 관리한다. 덕분에 이벤트 루프 스레드가 다른 일을 하다가 돌아와도, attachment에서 이정표를 꺼내 다음 실행할 함수 (operators.get(index)를 정확히 찾아낼 수 있다.
2) Selector & Pipe: 스레드 점유를 최소화하는 '전달' 메커니즘
이 코드에서 가장 중요한 부분은 operators.get(ctx.index).apply(input)가 실행된 직후이다.
결과를 즉시 리턴하지 않고 다시 dispatchNext를 통해 Pipe로 던져진다.
- Non-blocking의 실체: 함수 실행이 끝나면 스레드는 즉시 selector.select()로 돌아가 다른 요청의 처리를 가증하게 한다. 데이터가 다음 Pipe에 도달하기 전까지 스레드는 아무것도 기다리지 않는다.
- 이벤트 루프의 효율성: 마치 이어달리기 주자가 다음 주자에게 바통(Data)을 넘겨주고 쉬러 가는 것과 같다. 이러한 메커니즘 덕분에 단 몇 개의 스레드(이벤트 루프)만으로 수만 개의 동시 요청을 처리할 수 있는 구조적 기반이 마련된다.
3) Assembly vs Subscription: Lazy Execution의 구현
코드를 실행해 보면 "구독 시작" 로그가 먼저 찍히고 그 뒤에 "Step 1"이 실행된다. 이것이 리액티브 프로그래밍의 지연 실행이다.
- 조립(Assembly): flatMap을 호출할 때는 오직 operators 리스트에 함수 포인터를 저장할 뿐, CPU는 비즈니스 로직을 단 1ms도 쓰지 않는다.
- 구독(Subscription): subscribe()가 호출되어 첫 번째 Pipe에 데이터가 기록되는 순간, 잠자던 Selector가 깨어나며 실행된다. 구독하기 전까지는 아무 일도 일어나지 않는다
4. 마치며
작성한 코드는 로직이 순서대로 실행되는 것이 아니라, '동작이 끝나면 다음 동작을 Selector에게 위임하는' 구조이다.
이러한 메커니즘을 통해 하나의 파이프라인이 여러 스레드 위를 옮겨 다니며 실행될 수 있는 구조적 유연성을 갖게 된다.
Spring WebFlux가 고성능을 내는 비결은 단순히 빠른 언어를 써서가 아니라, 이처럼 자바 NIO의 Selector를 극한으로 활용해 스레드가 노는 시간(Idle)을 최소화했기 때문이다.
'Java & Kotlin' 카테고리의 다른 글
| 락 없는 동시성은 정말 안전할까? CAS, ABA 문제와 한계 (0) | 2025.12.10 |
|---|---|
| 클래스로더는 .class 파일을 어떻게 실행시키나? (4) | 2025.08.02 |
| NIO(New I/O)와 Webflux 비동기 프로그래밍 (1) | 2025.07.07 |
| Java Stream API의 핵심 개념과 병렬 처리의 함정 (0) | 2025.06.15 |
| Socket 통신 (1) | 2024.12.15 |