포스트

스레드 풀과 Executor 프레임워크

스레드 풀과 Executor 프레임워크

스레드를 직접 사용할 때 문제점

  • 스레드 생성 비용으로 인한 성능 문제
    • 메모리 할당: 스레드를 생성할 때는 호출 스택을 위한 메모리를 할당
    • 운영체제 자원 사용
      • 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어짐
      • 시스템 콜(system call)을 통해 처리
      • 이는 CPU와 메모리 리소스를 소모하는 작업
    • 운영체제 스케줄러 설정
      • 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정
      • 스레드 하나는 보통 1MB 이상의 메모리를 사용
  • 스레드 관리 문제
    • CPU, 메모리 자원은 한정되어 있기 때문에, 스레드를 무한하게 만들수 없음
  • Runnable 인터페이스의 불편함
    • 반환 값이 없음
    • 예외 처리: 체크 예외를 던질 수 없음

해결

1, 2번 문제를 해결하려면 스레드를 생성하고 관리하는 풀(Pool)이 필요합니다.

스레드를 매번 만드는 것이 아니라 만든 스레드를 사용하고 다시 풀에서 관리하는 식으로 스레드를 재사용 하는 것 입니다.

Executor

자바의 Executor 프레임워크는 멀티스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음입니다.

1
2
3
public interface Executor {
		void execute(Runnable command);
}

ExecutorService

  • Executor 인터페이스를 확장해서 작업 제출과 제어 기능으 추가로 제공
  • Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용
1
2
3
4
5
6
7
8
9
10
public interface ExecutorService extends Executor, AutoCloseable {

		<T> Future<T> submit(Callable<T> task);
		
		@Override
		default void close(){...}
		
		...
}

ThreadPoolExecutor

ExecutorService의 가장 대표적인 구현체입니다.

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  • corePoolSize: 스레드 풀에서 관리되는 기본 스레드 수
  • maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간
  • workQueue: 작업을 보관할 블로킹 큐 (차단 큐)

corePoolSize + 차단 큐 사이즈까지 다 차야 maximumPoolSize에 맞게 스레드를 늘립니다.

Future

FutureCallable을 이용하여 Runnable의 불편함을 없앨 수 있습니다.

Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CallableMainV1 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<Integer> future = es.submit(new MyCallable());
        Integer result = future.get();
        log("result value = " + result);
        es.close();

    }

    static class MyCallable implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            log("Callable 시작");
            sleep(2000);
            int value = new Random().nextInt(10);
            log("Callable 완료");
            return value;
        }
    }
}
  • es.submit(): Callablecall()을 실행 하고 Future 반환
  • Futureget() 메서드를 통해 결과 값 조회
  • Runnablestart() = Callablesubmit()
  • Runnablejoin() = Callableget() (단, 결과를 반환 받는다는 차이가 있음)

Future 동작 방식

Future 취소

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class FutureCancelMain {

    private static boolean mayInterruptIfRunning = true;
    //    private static boolean mayInterruptIfRunning = false;

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<String> future = es.submit(new MyFutureTask());
        log("Future.state: " + future.state());

        // 일정 시간 후 취소 시도
        sleep(3000);

        // cancel() 호출
        log("future.cancel(" + mayInterruptIfRunning + ") 호출");
        boolean cancelResult = future.cancel(mayInterruptIfRunning);
        log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult);

        // 결과 확인
        try {
            log("future result " + future.get());

        } catch (CancellationException e) {
            log("Future는 이미 취소 되었습니다.");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        es.close();
    }

    static class MyFutureTask implements Callable<String> {

        @Override
        public String call() {
            for (int i = 0; i < 10; i++) {
                try {
                    log("작업 중: " + i);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log("인터럽트 발생");
                    return "Interrupted";
                }
            }
            return "Completed";
        }
    }
}
  • cancel(true)
    • Future를 취소 상태로 변경
    • 작업이 실행 중이라면 Thread.interrupte()를 호출해서 작업을 중단
  • cancel(false)
    • Future를 취소 상태로 변경
    • 작업이 실행 중이라면 작업을 중단하지는 않음

Future 예외

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FutureExceptionMain {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("작업 전달");
        Future<Integer> future = es.submit(new ExCallable());
        sleep(1000); // 잠시 대기

        try {
            log("future.get() 호출 시도, future.state(): " + future.state());
            Integer result = future.get();
            log("result value = " + result);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            log("e = " + e);
            Throwable cause = e.getCause(); // 원본 예외
            log("cause = " + cause);
        }
        es.close();
    }

    static class ExCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            log("Callable 실행, 예외 발생");
            throw new IllegalStateException("ex!");
        }
    }
}

예외가 발생하면 ExecutionException 예외로 한 번 감싸져서 나옵니다.

ExecutorService 작업 컬렉션

invokeAll()

여러 Callable을 한번에 실행 시키고, 모두 완료 되었을 때 전체를 반환합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class InvokeAllMain {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService es = Executors.newFixedThreadPool(10);
        CallableTask task1 = new CallableTask("task1", 1000);
        CallableTask task2 = new CallableTask("task2", 1000);
        CallableTask task3 = new CallableTask("task3", 1000);

        List<CallableTask> tasks = List.of(task1, task2, task3);
        List<Future<Integer>> futures = es.invokeAll(tasks);
        for (Future<Integer> future : futures) {

            Integer value = future.get();
            log("value = " + value);
        }
        es.close();

    }
}

invokeAny()

여러 Callable을 한번에 실행 시키고, 가장 먼저 실행 완료 된 것을 반환합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class InvokeAnyMain {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService es = Executors.newFixedThreadPool(10);
        CallableTask task1 = new CallableTask("task1", 1000);
        CallableTask task2 = new CallableTask("task2", 1000);
        CallableTask task3 = new CallableTask("task3", 1000);

        List<CallableTask> tasks = List.of(task1, task2, task3);
        Integer value = es.invokeAny(tasks);
        log("value = " + value);
        es.close();

    }
}

ExecutorService 종료

  • void shutdown()
    • 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료
    • non-blocking
  • List<Runnable> shutdownNow()
    • 실행 중인 작업을 중단하고, 대기 중인 작업을 반환하며 즉시 종료
    • 실행 중인 작업을 중단하기 위해 인터럽트를 발생
    • non-blocking
  • boolean awaitTermination(long timeout, TimeUnit unit) throw InterruptedException
    • 서비스 종료 시 모든 작업이 완료될 때까지 지정된 시간까지 대기
    • non-blocking
  • void close
  • shutdown()을 호출하고, 작업이 완료되거나 인터럽트가 발생할 때 까지 무한정 반복 대기
  • 호출한 스레드에 인터럽트가 발생해도 shutdownNow()를 호출

ExecutorService 스레드 풀 관리

  • 기본 스레드 수: 2
  • 최대 스레드 수: 4
  • 최대 스레드 생존 시간: 3초
  • 차단 큐: 2

위 조건으로 예시를 들어보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadPoolExecutor es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue);
        printState(es);

        es.execute(new RunnableTask("task1"));
        printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        printState(es, "task6");

        try {
            es.execute(new RunnableTask("task7"));
            printState(es, "task7");
        } catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생: " + e);
        }

        sleep(3000);
        log("== 작업 수행 완료 ==");
        printState(es);

        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과");
        printState(es);

        es.close();
        log("== shutdown 완료");
        printState(es);
    }
}

기본 스레드 미리 생성

기본 스레드는 첫 요청이 들어올 때 생성 됩니다. 하지만 이것도 ThreadPoolExecutor.prestartAllCoreThreads()를 이용하면 미리 생성하여 첫 사용자도 빠르게 응답을 받아보게 할 수 있습니다.

1
2
3
4
5
6
7
8
public class PrestartPoolMain {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1000);
        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
        poolExecutor.prestartAllCoreThreads();
    }
}

Executor 전략

고정 풀 전략

기본 스레드와 최대 스레드를 같은 값으로 정하고 큐(LinkedBlockingQueue) 사이즈에 제한이 없도록 설정하여 사용하는 방식입니다.

newSingleThreadPool() 단일 스레드 풀 전략

  • 스레드 풀에 기본 스레드를 1개만 사용
  • 큐 사이즈에 제한이 없음
  • 주로 간단히 사용하거나, 테스트 용도로 사용
1
2
Executors.newSingleThreadExecutor()
// new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

newFixedThreadPool(n)

  • 스레드 풀에 n개 만큼의 기본 스레드를 생성
  • 초과 스레드도 n개 만큼만 설정
  • 스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식
  • 큐 사이즈에 제한이 없음
1
2
Executors.newFixedThreadPool(n)
// new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

주의

사용자가 계속 증가하거나 일시적으로 폭증 하게 되면 요청 보다 처리 속도가 느리게 됩니다. 그럼 큐에 계속 쌓이는 현상이 발생할 수 있습니다. 즉, 서버 자원은 많은데 스레드 풀의 부족으로 처리 속도만 느리게 됩니다.

캐시 풀 전략

  • 기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용
  • 초과 스레드의 수는 제한이 없음
  • 큐에 작업을 저장하지 않음(SynchronousQueue)
    • 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리
  • 모든 요청이 대기하지 않고 스레드가 바로 처리
1
2
Executors.newCachedThreadPool()
// new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

주의

사용자가 많아지거나 일시적으로 폭증하게 되면 스레드를 들어오는 대로 무한으로 스레드를 늘리기 때문에 CPU, 메모리 사용량이 지나치게 높아 지며 서버가 다운 될 수 있습니다.

사용자 정의 풀 전략

  • 평소에는 안정적으로 운영하다가, 사용자의, 요청이 갑자기 증가하면 긴급하게 스레드를 더 투입해서 급한 불을 끄는 방법
  • 시스템이 감당할 수 없을 정도로 사용자의 요청이 폭증하면, 처리가능한 수준의 사용자 요청만 처리하고 나머지 요청은 거절 함으로써 시스템 다운 상황을 피함
1
ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));

차단 큐 설정 주의

1
new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());

위 처럼 설정 시, 차단 큐가 무한이므로 초과 스레드가 생성되지 않습니다.

정리

  • 고정 스레드 풀 전략: 트래픽이 일정하고, 시스템 안전성이 가장 중요
  • 캐시 스레드 풀 전략: 일반적인 성장하는 서비스
  • 사용자 정의 풀 전략: 다양한 상황에 대응

Executor 예외 정책

  • ThreadPoolExecutor.AbortPolicy
    • 기본 정책
    • 새로운 작업을 제출할 때 RejectedExecutionException을 발생
  • ThreadPoolExecutor.DiscardPolicy: 새로운 작업을 조용히 버림
  • ThreadPoolExecutor.CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행
  • 사용자 정의: RejectedExecutionHandler를 상속 받아 구현
1
ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy());

참고

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.