본문 바로가기

Java & Kotlin

Java ThreadPoolExecutor 예외 처리와 Exception Handler

ThreadPoolExecutor의 예외처리와 Exception을 어떤식으로 핸들링 해야 하는지, 그리고 JDK에서는 어떤 설정을 기본으로 가져가는지 알아보겠습니다.

 

알아보기에 앞서 ThreadPoolExecutor에 대한 사용법부터 간단하게 알아보겠습니다.

1. 기본 사용 방법

ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 60, 60, TimeUnit.SECONDS
	, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy());
    
executor.execute(runnable);
executor.submit(callable);

 

각 파라미터에 대한 설명

  • 1(core) : 코어 쓰레드는 30개이며, 초기 쓰레드 개수는 0개 (작업을 실행하거나, 수동으로 prestart 시 코어 쓰레드 생성)
  • 2(max) : 최대 쓰레드는 60개이며, Queue의 크기 1000개가 모두 초과된 후 작업이 들어오면 쓰레드가 생성
  • 3(time) : 코어 쓰레드 이상으로 추가 생성된 쓰레드는 idle 상태로 해당 시간만큼 유지 후 자동으로 제거
  • 4(timeunit) : time을 지정하기 위한 time unit
  • 5(blocking queue) : 작업을 담아두기 위한 저장소, BlockingQueue의 하위 타입 사용 가능
  • 6(RejectedExecutionHandler) : 작업 실행 불가 핸들러, 밑에서 좀 더 자세하게 설명하겠습니다.

사용 방법에 대해서는 새로운 글로 올리도록 하겠습니다.

 

여기서는 각 상황별 에러 케이스는 어떤게 있고, 어떻게 조치해야 하는지 알아보겠습니다.

 

 

2. 큐 용량이 초과된 경우

위에서 큐 용량이 초과된 경우 최대로 max size까지 신규 쓰레드를 생성한다고 했습니다.

여기까지는 문제가 발생하지 않으나, max size만큼 쓰레드가 생성된 후 큐가 가득 차고 작업을 큐에 더 넣을려고 하면 문제가 발생합니다.

 

이 경우 작업을 수행할 수 없기에 ThreadPoolExecutor 인스턴스는 reject 메서드를 호출하게 되고,

reject 메서드는 아래와 같이 rejectedExcution메서드를 호출하게 됩니다.

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

 

그러면 어떤 rejectedExeution 메서드들이 있는지 알아보겠습니다.

(1) AbortPolicy - jdk 21 기본 설정

듣기로는 ThreadPoolExecutor가 등장한 초기부터 해당 설정을 기본 설정으로 되었다고 알고있습니다.

아마 jdk 버전 24+ 이 올라가도 기본 설정으로 사용할 것 같습니다.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

 

내용은 꽤나 단순합니다. java.util.consurrent 패키지의 RejectedExecutionException을 던집니다.

아마 ThreadPoolExecutor로 테스트를 하다보면 종종 보게 되는 Exception입니다.

 

(2) DiscardPolicy

Discard, '무시한다' 라는 rejectedExecutionHandler입니다.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

 

보시는 것과 같이 아무런 코드가 없습니다. 작업이 초과되어 들어오면 작업을 아무런 로그를 남기지 않고 버립니다.

일부 데이터가 누락되더라도 괜찮다면 사용할 법 한데, 그렇다고 성능이 월등히 높아지진 않아서 쓸일은 없을 것 같습니다.

추가로 큐에 잘 들어가있던 작업을 버리고 현재 작업이 들어가는 DiscardOldestPolicy 도 있네요.

 

(3) CallerRunsPolicy - 백프레셔

이 handler를 설명하기 위해서 글을 작성하였습니다. 꽤나 참신한 처리로 코드는 아래와 같습니다.

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

'main 쓰레드가 작업을 실행하겠다' 라고 메서드가 적혀있는데, 단순히 작업만을 실행시키는 코드가 아닙니다.

 

ThreadPoolExecutor는 소비자(executor thread) 만 담당하는 클래스인데,

작업을 요청하는 생산자(main thread) 에게 일을 시킴으로써 그 일을 하는동안 생산을 하지못하게 막는 역할을 하게됩니다.

 

작업을 생산하지 못하면 생산하지 못하는동안 소비자는 밀려있던 작업을 함으로써 Queue에 여유 공간을 만들어 놓습니다.

이렇게 되면 큐가 가득차도 Exception 발생 없이 모든 일을 처리할 수 있게 됩니다.

 

여기에 지수 백오프를 추가하여 과부하에 대한 안정화 처리도 가능합니다.

private static final RejectedExecutionHandler customHandler = (runnable, executor) -> {
    long delay = 100L;
    int attempt = 0;

    while (attempt < 5) {
        try {
            if (!executor.isShutdown() && executor.getQueue().offer(runnable, delay, TimeUnit.MILLISECONDS)) {
                return;
            }
            attempt++;
            delay = Math.min(1000, delay * 2);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Task interrupted while retrying with backoff", e);
        }
    }
    // 에러처리 또는 outboxing 처리
    throw new RejectedExecutionException("Task rejected after retries");
};

 

 

3. 작업 실행 중 에러가 발생한 경우 

멀티 스레드 환경에서는 각 스레드가 독립적인 콜 스택을 가지기 때문에, 한 스레드에서 발생한 예외는 다른 스레드로 직접 전파되지 않습니다.

 

그러나 ThreadPoolExecutor는 이를 간접적으로 처리할 수 있는 방법을 제공합니다.

바로 submit() 메서드를 통해 Future 객체를 반환받는 방식입니다.

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

 

submit()은 내부적으로 RunnableRunnableFuture로 감싼 후 실행하고, 결과를 받을 수 있는 Future를 반환합니다.

Future는 작업의 성공/실패 여부, 결과값, 예외를 담고 있는 일종의 참조 객체입니다.

 

만약 실행 중 예외가 발생하고, 이후에 future.get()을 호출하면,

이때 예외는 ExecutionException으로 감싸져서 호출한 스레드(main 등)에서 던져지게 됩니다.

 

즉, 스레드 풀 내부에서 발생한 예외는 사라지지 않고,

Future.get()을 통해 원래 예외(Inner Exception) 를 감싼 ExecutionException 형태로

호출한 쪽(예: 메인 스레드)으로 전파되는 것입니다.

 

추가로, CheckedException을 던질려면 Runnable 대신 Callable을 사용하면 됩니다.

 

참고 코드 (jdk 21 - 표준 라이브러리)

Future 인터페이스 Docs
     /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
 ->  * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

 

Exception 발생 로그

Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Occurred Exception. threadName: pool-1-thread-1
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.shop.shoppingevent.Main.main(Main.java:16)
Caused by: java.lang.RuntimeException: Occurred Exception. threadName: pool-1-thread-1
	at com.shop.shoppingevent.Main.lambda$main$0(Main.java:13)

 

Checked Exception을 수용하는 Callable Functional Interface

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception; <<<- Checked Exception도 수용합니다.
}

 

이를 이용하면 빠르고 안정적인 처리를 할 수 있습니다.

 

단, 꼭 멀티스레딩을 사용하기 위해서는 내부 로직/예외처리를 잘 작성하고 성능테스트와 모니터링을 병행해야 합니다.