ThreadPoolExecutor의 예외처리와 Exception을 어떤식으로 핸들링 해야 하는지, 그리고 JDK에서는 어떤 설정을 기본으로 가져가는지 알아보겠습니다.
알아보기에 앞서 ThreadPoolExecutor에 대한 사용법부터 간단하게 알아보겠습니다.
1. 기본 사용 방법
ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 60, 60, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy());
executor.execute(runnable);
executor.submit(callable);
각 파라미터에 대한 설명
- 1(core) : 코어 쓰레드는 30개이며, 초기 쓰레드 개수는 0개 (작업을 실행하거나, 수동으로 prestart 시 코어 쓰레드 생성)
- 2(max) : 최대 쓰레드는 60개이며, Queue의 크기 1000개가 모두 초과된 후 작업이 들어오면 쓰레드가 생성됨
- 3(time) : 코어 쓰레드 이상으로 추가 생성된 쓰레드는 idle 상태로 해당 시간만큼 유지 후 자동으로 제거됨
- 4(timeunit) : time을 지정하기 위한 time unit
- 5(blocking queue) : 작업을 담아두기 위한 저장소, BlockingQueue의 하위 타입 사용 가능
- 6(RejectedExecutionHandler) : 작업 실행 불가 핸들러, 밑에서 좀 더 자세하게 설명하겠습니다.
사용 방법에 대해서는 새로운 글로 올리도록 하겠습니다.
여기서는 각 상황별 에러 케이스는 어떤게 있고, 어떻게 조치해야 하는지 알아보겠습니다.
2. 큐 용량이 초과된 경우
위에서 큐 용량이 초과된 경우 최대로 max size까지 신규 쓰레드를 생성한다고 했습니다.
여기까지는 문제가 발생하지 않으나, max size만큼 쓰레드가 생성된 후 큐가 가득 차고 작업을 큐에 더 넣을려고 하면 문제가 발생합니다.
이 경우 작업을 수행할 수 없기에 ThreadPoolExecutor 인스턴스는 reject 메서드를 호출하게 되고,
reject 메서드는 아래와 같이 rejectedExcution메서드를 호출하게 됩니다.
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
그러면 어떤 rejectedExeution 메서드들이 있는지 알아보겠습니다.
(1) AbortPolicy - jdk 21 기본 설정
듣기로는 ThreadPoolExecutor가 등장한 초기부터 해당 설정을 기본 설정으로 되었다고 알고있습니다.
아마 jdk 버전 24+ 이 올라가도 기본 설정으로 사용할 것 같습니다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
내용은 꽤나 단순합니다. java.util.consurrent 패키지의 RejectedExecutionException을 던집니다.
아마 ThreadPoolExecutor로 테스트를 하다보면 종종 보게 되는 Exception입니다.
(2) DiscardPolicy
Discard, '무시한다' 라는 rejectedExecutionHandler입니다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
보시는 것과 같이 아무런 코드가 없습니다. 작업이 초과되어 들어오면 작업을 아무런 로그를 남기지 않고 버립니다.
일부 데이터가 누락되더라도 괜찮다면 사용할 법 한데, 그렇다고 성능이 월등히 높아지진 않아서 쓸일은 없을 것 같습니다.
추가로 큐에 잘 들어가있던 작업을 버리고 현재 작업이 들어가는 DiscardOldestPolicy 도 있네요.
(3) CallerRunsPolicy - 백프레셔
이 handler를 설명하기 위해서 글을 작성하였습니다. 꽤나 참신한 처리로 코드는 아래와 같습니다.
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
'main 쓰레드가 작업을 실행하겠다' 라고 메서드가 적혀있는데, 단순히 작업만을 실행시키는 코드가 아닙니다.
ThreadPoolExecutor는 소비자(executor thread) 만 담당하는 클래스인데,
작업을 요청하는 생산자(main thread) 에게 일을 시킴으로써 그 일을 하는동안 생산을 하지못하게 막는 역할을 하게됩니다.
작업을 생산하지 못하면 생산하지 못하는동안 소비자는 밀려있던 작업을 함으로써 Queue에 여유 공간을 만들어 놓습니다.
이렇게 되면 큐가 가득차도 Exception 발생 없이 모든 일을 처리할 수 있게 됩니다.
여기에 지수 백오프를 추가하여 과부하에 대한 안정화 처리도 가능합니다.
private static final RejectedExecutionHandler customHandler = (runnable, executor) -> {
long delay = 100L;
int attempt = 0;
while (attempt < 5) {
try {
if (!executor.isShutdown() && executor.getQueue().offer(runnable, delay, TimeUnit.MILLISECONDS)) {
return;
}
attempt++;
delay = Math.min(1000, delay * 2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Task interrupted while retrying with backoff", e);
}
}
// 에러처리 또는 outboxing 처리
throw new RejectedExecutionException("Task rejected after retries");
};
3. 작업 실행 중 에러가 발생한 경우
멀티 스레드 환경에서는 각 스레드가 독립적인 콜 스택을 가지기 때문에, 한 스레드에서 발생한 예외는 다른 스레드로 직접 전파되지 않습니다.
그러나 ThreadPoolExecutor는 이를 간접적으로 처리할 수 있는 방법을 제공합니다.
바로 submit() 메서드를 통해 Future 객체를 반환받는 방식입니다.
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit()은 내부적으로 Runnable을 RunnableFuture로 감싼 후 실행하고, 결과를 받을 수 있는 Future를 반환합니다.
이 Future는 작업의 성공/실패 여부, 결과값, 예외를 담고 있는 일종의 참조 객체입니다.
만약 실행 중 예외가 발생하고, 이후에 future.get()을 호출하면,
이때 예외는 ExecutionException으로 감싸져서 호출한 스레드(main 등)에서 던져지게 됩니다.
즉, 스레드 풀 내부에서 발생한 예외는 사라지지 않고,
Future.get()을 통해 원래 예외(Inner Exception) 를 감싼 ExecutionException 형태로
호출한 쪽(예: 메인 스레드)으로 전파되는 것입니다.
추가로, CheckedException을 던질려면 Runnable 대신 Callable을 사용하면 됩니다.
참고 코드 (jdk 21 - 표준 라이브러리)
Future 인터페이스 Docs
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
-> * @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
Exception 발생 로그
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Occurred Exception. threadName: pool-1-thread-1
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.shop.shoppingevent.Main.main(Main.java:16)
Caused by: java.lang.RuntimeException: Occurred Exception. threadName: pool-1-thread-1
at com.shop.shoppingevent.Main.lambda$main$0(Main.java:13)
Checked Exception을 수용하는 Callable Functional Interface
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception; <<<- Checked Exception도 수용합니다.
}
이를 이용하면 빠르고 안정적인 처리를 할 수 있습니다.
단, 꼭 멀티스레딩을 사용하기 위해서는 내부 로직/예외처리를 잘 작성하고 성능테스트와 모니터링을 병행해야 합니다.
'Java & Kotlin' 카테고리의 다른 글
Java Stream API의 핵심 개념과 병렬 처리의 함정 (0) | 2025.06.15 |
---|---|
Socket 통신 (1) | 2024.12.15 |
Java Thread로 직접 구현하는 커스텀 쓰레드 풀: 기본 원리부터 동작까지 (0) | 2024.10.06 |
Kotlin으로 TTL 기반 Map 구현하기: Java 21 가상 쓰레드와 ConcurrentHashMap 최적화 (2) | 2024.10.04 |
Java Checked vs Unchecked Exception: 예외 처리의 원칙과 트랜잭션 롤백 (0) | 2024.03.03 |