Java & Kotlin

Java Thread로 직접 구현하는 커스텀 쓰레드 풀: 기본 원리부터 동작까지

백엔드 유성 2024. 10. 6. 21:20

오늘은 Java의 Thread 객체만을 이용해 직접 Executors 클래스의 쓰레드 풀을 구현하는 방법에 대해 소개하겠습니다. Java에서는 Executors 클래스를 사용해 쉽게 쓰레드 풀을 만들 수 있지만, 내부적으로 어떻게 작동하는지 이해하면 더 유연하고 자신만의 커스텀 쓰레드 풀을 설계할 수 있습니다. 이번 글에서는 ThreadBlockingQueue를 이용해 최소한의 쓰레드 풀을 구현하고, 이를 통해 기본적인 원리를 설명해보겠습니다.

 

코드는 GitHub에서 확인하실 수 있습니다: https://github.com/youseonghyeon/threadpool

 

요구 사항

구현하려는 Thread Pool의 주요 기능은 다음과 같습니다:

 

  1. 최소 쓰레드 개수 설정: 항상 일정한 최소 개수의 쓰레드가 살아있어야 합니다.
  2. 최대 쓰레드 개수 설정: 필요시 최대 쓰레드 개수까지 동적으로 쓰레드를 생성해야 합니다.
  3. 유휴 상태 관리: 쓰레드 사용량이 없을 경우 최소 쓰레드 개수로 줄어들어야 합니다.

 

이를 구현하기 위해 작업 큐(BlockingQueue)Worker Thread를 이용해 비동기 작업을 처리하는 방식으로 설계할 예정입니다.

 

기본 설계

우선, 간단한 설계를 먼저 설명드리겠습니다. MyThreadPoolService 클래스는 사용자가 정의한 최소/최대 쓰레드 수를 기준으로, 새로운 작업이 추가될 때마다 적절한 개수의 쓰레드를 생성합니다. 동시에, 주기적으로 WatchDog 쓰레드가 동작해 유휴 상태의 쓰레드를 정리하고, 최소 쓰레드 개수를 유지합니다.

 

주요 클래스 및 역할

  • MyThreadPoolService: 쓰레드 풀을 관리하고, 작업을 할당하며, 동작을 종료하는 등의 주요 기능을 제공합니다.
  • WorkerThread: 각각의 작업을 실행하는 쓰레드입니다. Runnable 작업을 받아 처리하고, 유휴 상태를 관리합니다.

 

코드 설명

 

1. MyThreadPoolService 클래스

 

이 클래스는 기본적으로 최소/최대 쓰레드 수와 작업 큐를 관리합니다. 생성자를 통해 풀의 크기를 설정하며, 작업이 추가될 때마다 쓰레드를 생성하거나 기존 쓰레드에서 작업을 처리하게 됩니다.

package thread;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class MyThreadPoolService {
    // minThreads와 maxThreads는 최소 스레드 개수와 최대 스레드 개수를 의미합니다.
    private final int minThreads;
    private final int maxThreads;
    // taskQueue는 작업을 담는 큐입니다.
    private final BlockingQueue<Runnable> taskQueue;
    // activeThreadList는 현재 활성화된 스레드를 관리하는 리스트입니다.
    private final List<WorkerThread> activeThreadList;
    // isStopped는 스레드 풀이 종료되었는지 여부를 나타내는 플래그입니다.
    private final AtomicBoolean isStopped = new AtomicBoolean(false);

    // 스레드의 타임아웃 시간을 나타내는 상수입니다.
    private final int threadTimeout = 10000;

    /**
     * MyThreadPoolService 생성자입니다.
     * 생성자 실행 시 WatchDog 쓰레드가 시작됩니다.
     */
    public MyThreadPoolService(int minThreads, int maxThreads) {
        if (minThreads < 1 || maxThreads < 1 || minThreads > maxThreads) {
            throw new IllegalArgumentException("Invalid thread pool size");
        }
        this.minThreads = minThreads;
        this.maxThreads = maxThreads;
        this.taskQueue = new LinkedBlockingQueue<>();
        this.activeThreadList = Collections.synchronizedList(new ArrayList<>());
        startWatchDogThread();
    }

    /**
     * 스레드 풀에 작업을 추가합니다.
     * 스레드 풀이 종료된 경우 IllegalStateException을 발생시킵니다.
     */
    public void execute(Runnable task) {
        if (isStopped.get()) {
            throw new IllegalStateException("Thread pool is stopped");
        }
        if (!taskQueue.isEmpty() && activeThreadList.size() < maxThreads) {
            generateWorkerThread();
        }
        taskQueue.offer(task);
    }

    /**
     * 스레드 풀을 종료합니다.
     */
    public void shutdown() {
        isStopped.set(true);
        for (WorkerThread worker : activeThreadList) {
            worker.stopWorker();
        }
    }

    /**
     * 새로운 쓰레드를 생성하고 activeThreadList에 추가합니다.
     */
    private void generateWorkerThread() {
        WorkerThread worker = new WorkerThread(taskQueue);
        activeThreadList.add(worker);
        worker.start();
    }

    /**
     * 의도하지 않게 종료된 스레드를 activeThreadList 에서 제거합니다.
     */
    private void removeDeadThreads() {
        activeThreadList.removeIf(workerThread -> !workerThread.isAlive());
    }

    /**
     * 타임아웃된 스레드를 한개씩 activeThreadList에서 제거합니다.
     */
    private void removeTimedOutThread() {
        if (activeThreadList.size() > minThreads) {
            Optional<WorkerThread> targetThread = activeThreadList.stream()
                    .filter(this::timeOver)
                    .findFirst();
            targetThread.ifPresent(workerThread -> {
                workerThread.stopWorker();
                activeThreadList.remove(workerThread);
            });
        }
    }

    /**
     * 스레드가 타임아웃되었는지 확인합니다.
     */
    private boolean timeOver(WorkerThread workerThread) {
        long lastStartTime = workerThread.getLastStartTime();
        return System.currentTimeMillis() - lastStartTime > threadTimeout;
    }


    /**
     * 최소 스레드 개수를 유지합니다.
     * <p>
     *     taskQueue에 작업이 있으면서, 쓰레드가 모두 종료된 경우 작업을 실행하지 못하는데 이를 방지합니다.
     * </p>
     */
    private void maintainMinThreads() {
        if (activeThreadList.size() < minThreads) {
            int currentThreadSize = activeThreadList.size();
            for (int i = currentThreadSize; i < minThreads; i++) {
                generateWorkerThread();
            }
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    
    /**
     * WatchDog 쓰레드를 시작합니다.
     */
    private void startWatchDogThread() {
        new Thread(() -> {
            while (!isStopped.get()) {
                removeDeadThreads();
                removeTimedOutThread();
                maintainMinThreads();
                sleep(5000);
            }
        }).start();
    }

}

 

  • taskQueueBlockingQueue를 사용하여 스레드가 대기 중에 작업이 들어오면 바로 실행할 수 있도록 설계되었습니다.
  • activeThreadList는 현재 활성화된 쓰레드들의 리스트를 관리하며, 각 쓰레드는 WorkerThread 인스턴스로 관리됩니다.
  • WatchDog는 주기적으로 쓰레드 풀 상태를 점검하며, 필요 시 유휴 상태인 쓰레드를 정리하거나 부족한 쓰레드를 생성합니다.

 

2. WorkerThread 클래스

 

WorkerThread는 큐에 쌓인 작업을 처리하고, 유휴 상태가 되면 스레드 풀에서 제거될 수 있습니다.

 

package thread;

import java.util.concurrent.BlockingQueue;

class WorkerThread extends Thread {
    // taskQueue는 작업을 담는 큐입니다. 부모 클래스인 MyThreadPoolService에서 초기화하며 공유합니다.
    private final BlockingQueue<Runnable> taskQueue;
    // isStopped는 스레드가 종료되었는지 여부를 나타내는 플래그입니다.
    private boolean isStopped = false;
    // lastStartTime은 마지막으로 작업을 수행한 시간을 나타냅니다.
    private long lastStartTime;

    protected WorkerThread(BlockingQueue<Runnable> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        while (!isStopped) {
            try {
                lastStartTime = System.currentTimeMillis();
                // 블로킹 큐에서 작업을 실행하므로 따로 time sleep을 사용하지 않습니다.
                taskQueue.take().run();
            } catch (InterruptedException e) {
                System.out.println("[WARN] Thread interrupted " + Thread.currentThread().getName());
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    protected long getLastStartTime() {
        return lastStartTime;
    }

    protected void stopWorker() {
        isStopped = true;
        this.interrupt(); // 인터럽트를 통해 종료
    }
}

 

  • WorkerThreadBlockingQueue에서 작업을 가져와 실행하며, 인터럽트되거나 명시적으로 종료되지 않는 한 계속해서 동작합니다.
  • lastStartTime은 쓰레드가 마지막으로 작업을 시작한 시간을 기록하여, 일정 시간 유휴 상태인 경우 스레드를 종료할 수 있도록 도와줍니다.

 

3. WatchDog Thread

 

WatchDog 쓰레드는 MyThreadPoolService에서 주기적으로 작동하며, 스레드 풀의 상태를 관리합니다. 특정 주기마다 유휴 상태인 쓰레드를 정리하고, 최소한의 쓰레드가 항상 유지되도록 합니다.

 

private void startWatchDogThread() {
    new Thread(() -> {
        while (!isStopped.get()) {
            removeDeadThreads();
            removeTimedOutThread();
            maintainMinThreads();
            sleep(5000);
        }
    }).start();
}

 

결론

 

이번 포스트에서는 Java의 기본 Thread 객체와 BlockingQueue를 이용해 직접 쓰레드 풀을 구현해보았습니다. 이렇게 직접 구현해보면, 기존의 Executors가 제공하는 쓰레드 풀 구현 방식과 유사하면서도 더 유연하게 커스터마이징할 수 있는 방법을 배울 수 있습니다. 쓰레드 관리나 최소/최대 쓰레드 수 조정 같은 기능을 자신만의 방식으로 구현할 수 있다는 점이 큰 장점입니다.

 

감사합니다.