스레드를 직접 사용할 때 문제점
- 스레드 생성 비용으로 인한 성능 문제
- 메모리 할당: 스레드를 생성할 때는 호출 스택을 위한 메모리를 할당
- 운영체제 자원 사용
- 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어짐
- 시스템 콜(system call)을 통해 처리
- 이는 CPU와 메모리 리소스를 소모하는 작업
- 운영체제 스케줄러 설정
- 새로운 스레드가 생성되면 운영체제의 스케줄러는 이 스레드를 관리하고 실행 순서를 조정
- 스레드 하나는 보통 1MB 이상의 메모리를 사용
- 스레드 관리 문제
- CPU, 메모리 자원은 한정되어 있기 때문에, 스레드를 무한하게 만들수 없음
Runnable 인터페이스의 불편함- 반환 값이 없음
- 예외 처리: 체크 예외를 던질 수 없음
해결
1, 2번 문제를 해결하려면 스레드를 생성하고 관리하는 풀(Pool)이 필요합니다.
스레드를 매번 만드는 것이 아니라 만든 스레드를 사용하고 다시 풀에서 관리하는 식으로 스레드를 재사용 하는 것 입니다.
Executor
자바의 Executor 프레임워크는 멀티스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음입니다.
1
2
3
| public interface Executor {
void execute(Runnable command);
}
|
ExecutorService
Executor 인터페이스를 확장해서 작업 제출과 제어 기능으 추가로 제공Executor 프레임워크를 사용할 때는 대부분 이 인터페이스를 사용
1
2
3
4
5
6
7
8
9
10
|
public interface ExecutorService extends Executor, AutoCloseable {
<T> Future<T> submit(Callable<T> task);
@Override
default void close(){...}
...
}
|
ThreadPoolExecutor
ExecutorService의 가장 대표적인 구현체입니다.
1
2
3
4
5
6
7
8
9
| public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
|
corePoolSize: 스레드 풀에서 관리되는 기본 스레드 수maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수keepAliveTime: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간workQueue: 작업을 보관할 블로킹 큐 (차단 큐)
corePoolSize + 차단 큐 사이즈까지 다 차야 maximumPoolSize에 맞게 스레드를 늘립니다.
Future
Future와 Callable을 이용하여 Runnable의 불편함을 없앨 수 있습니다.
Callable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("Callable 완료");
return value;
}
}
}
|
es.submit(): Callable의 call()을 실행 하고 Future 반환Future의 get() 메서드를 통해 결과 값 조회
Runnable의 start() = Callable의 submit()Runnable의 join() = Callable의 get() (단, 결과를 반환 받는다는 차이가 있음)
Future 동작 방식
Future 취소
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| public class FutureCancelMain {
private static boolean mayInterruptIfRunning = true;
// private static boolean mayInterruptIfRunning = false;
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<String> future = es.submit(new MyFutureTask());
log("Future.state: " + future.state());
// 일정 시간 후 취소 시도
sleep(3000);
// cancel() 호출
log("future.cancel(" + mayInterruptIfRunning + ") 호출");
boolean cancelResult = future.cancel(mayInterruptIfRunning);
log("cancel(" + mayInterruptIfRunning + ") result: " + cancelResult);
// 결과 확인
try {
log("future result " + future.get());
} catch (CancellationException e) {
log("Future는 이미 취소 되었습니다.");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
es.close();
}
static class MyFutureTask implements Callable<String> {
@Override
public String call() {
for (int i = 0; i < 10; i++) {
try {
log("작업 중: " + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
log("인터럽트 발생");
return "Interrupted";
}
}
return "Completed";
}
}
}
|
cancel(true)Future를 취소 상태로 변경- 작업이 실행 중이라면
Thread.interrupte()를 호출해서 작업을 중단
cancel(false)Future를 취소 상태로 변경- 작업이 실행 중이라면 작업을 중단하지는 않음
Future 예외
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public class FutureExceptionMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
log("작업 전달");
Future<Integer> future = es.submit(new ExCallable());
sleep(1000); // 잠시 대기
try {
log("future.get() 호출 시도, future.state(): " + future.state());
Integer result = future.get();
log("result value = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
log("e = " + e);
Throwable cause = e.getCause(); // 원본 예외
log("cause = " + cause);
}
es.close();
}
static class ExCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 실행, 예외 발생");
throw new IllegalStateException("ex!");
}
}
}
|
예외가 발생하면 ExecutionException 예외로 한 번 감싸져서 나옵니다.
ExecutorService 작업 컬렉션
invokeAll()
여러 Callable을 한번에 실행 시키고, 모두 완료 되었을 때 전체를 반환합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public class InvokeAllMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 1000);
CallableTask task3 = new CallableTask("task3", 1000);
List<CallableTask> tasks = List.of(task1, task2, task3);
List<Future<Integer>> futures = es.invokeAll(tasks);
for (Future<Integer> future : futures) {
Integer value = future.get();
log("value = " + value);
}
es.close();
}
}
|
invokeAny()
여러 Callable을 한번에 실행 시키고, 가장 먼저 실행 완료 된 것을 반환합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public class InvokeAnyMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService es = Executors.newFixedThreadPool(10);
CallableTask task1 = new CallableTask("task1", 1000);
CallableTask task2 = new CallableTask("task2", 1000);
CallableTask task3 = new CallableTask("task3", 1000);
List<CallableTask> tasks = List.of(task1, task2, task3);
Integer value = es.invokeAny(tasks);
log("value = " + value);
es.close();
}
}
|
ExecutorService 종료
void shutdown()- 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료
- non-blocking
List<Runnable> shutdownNow()- 실행 중인 작업을 중단하고, 대기 중인 작업을 반환하며 즉시 종료
- 실행 중인 작업을 중단하기 위해 인터럽트를 발생
- non-blocking
boolean awaitTermination(long timeout, TimeUnit unit) throw InterruptedException- 서비스 종료 시 모든 작업이 완료될 때까지 지정된 시간까지 대기
- non-blocking
void closeshutdown()을 호출하고, 작업이 완료되거나 인터럽트가 발생할 때 까지 무한정 반복 대기- 호출한 스레드에 인터럽트가 발생해도
shutdownNow()를 호출
ExecutorService 스레드 풀 관리
- 기본 스레드 수: 2
- 최대 스레드 수: 4
- 최대 스레드 생존 시간: 3초
- 차단 큐: 2
위 조건으로 예시를 들어보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class PoolSizeMainV1 {
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadPoolExecutor es = new ThreadPoolExecutor(2, 4, 3000, TimeUnit.MILLISECONDS, workQueue);
printState(es);
es.execute(new RunnableTask("task1"));
printState(es, "task1");
es.execute(new RunnableTask("task2"));
printState(es, "task2");
es.execute(new RunnableTask("task3"));
printState(es, "task3");
es.execute(new RunnableTask("task4"));
printState(es, "task4");
es.execute(new RunnableTask("task5"));
printState(es, "task5");
es.execute(new RunnableTask("task6"));
printState(es, "task6");
try {
es.execute(new RunnableTask("task7"));
printState(es, "task7");
} catch (RejectedExecutionException e) {
log("task7 실행 거절 예외 발생: " + e);
}
sleep(3000);
log("== 작업 수행 완료 ==");
printState(es);
sleep(3000);
log("== maximumPoolSize 대기 시간 초과");
printState(es);
es.close();
log("== shutdown 완료");
printState(es);
}
}
|
기본 스레드 미리 생성
기본 스레드는 첫 요청이 들어올 때 생성 됩니다. 하지만 이것도 ThreadPoolExecutor.prestartAllCoreThreads()를 이용하면 미리 생성하여 첫 사용자도 빠르게 응답을 받아보게 할 수 있습니다.
1
2
3
4
5
6
7
8
| public class PrestartPoolMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1000);
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) es;
poolExecutor.prestartAllCoreThreads();
}
}
|
Executor 전략
고정 풀 전략
기본 스레드와 최대 스레드를 같은 값으로 정하고 큐(LinkedBlockingQueue) 사이즈에 제한이 없도록 설정하여 사용하는 방식입니다.
newSingleThreadPool() 단일 스레드 풀 전략
- 스레드 풀에 기본 스레드를 1개만 사용
- 큐 사이즈에 제한이 없음
- 주로 간단히 사용하거나, 테스트 용도로 사용
1
2
| Executors.newSingleThreadExecutor()
// new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
|
newFixedThreadPool(n)
- 스레드 풀에 n개 만큼의 기본 스레드를 생성
- 초과 스레드도 n개 만큼만 설정
- 스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식
- 큐 사이즈에 제한이 없음
1
2
| Executors.newFixedThreadPool(n)
// new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
|
주의
사용자가 계속 증가하거나 일시적으로 폭증 하게 되면 요청 보다 처리 속도가 느리게 됩니다. 그럼 큐에 계속 쌓이는 현상이 발생할 수 있습니다. 즉, 서버 자원은 많은데 스레드 풀의 부족으로 처리 속도만 느리게 됩니다.
캐시 풀 전략
- 기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용
- 초과 스레드의 수는 제한이 없음
- 큐에 작업을 저장하지 않음(
SynchronousQueue)- 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리
- 모든 요청이 대기하지 않고 스레드가 바로 처리
1
2
| Executors.newCachedThreadPool()
// new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
|
주의
사용자가 많아지거나 일시적으로 폭증하게 되면 스레드를 들어오는 대로 무한으로 스레드를 늘리기 때문에 CPU, 메모리 사용량이 지나치게 높아 지며 서버가 다운 될 수 있습니다.
사용자 정의 풀 전략
- 평소에는 안정적으로 운영하다가, 사용자의, 요청이 갑자기 증가하면 긴급하게 스레드를 더 투입해서 급한 불을 끄는 방법
- 시스템이 감당할 수 없을 정도로 사용자의 요청이 폭증하면, 처리가능한 수준의 사용자 요청만 처리하고 나머지 요청은 거절 함으로써 시스템 다운 상황을 피함
1
| ExecutorService es = new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
|
차단 큐 설정 주의
1
| new ThreadPoolExecutor(100, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());
|
위 처럼 설정 시, 차단 큐가 무한이므로 초과 스레드가 생성되지 않습니다.
정리
- 고정 스레드 풀 전략: 트래픽이 일정하고, 시스템 안전성이 가장 중요
- 캐시 스레드 풀 전략: 일반적인 성장하는 서비스
- 사용자 정의 풀 전략: 다양한 상황에 대응
Executor 예외 정책
ThreadPoolExecutor.AbortPolicy- 기본 정책
- 새로운 작업을 제출할 때
RejectedExecutionException을 발생
ThreadPoolExecutor.DiscardPolicy: 새로운 작업을 조용히 버림ThreadPoolExecutor.CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행- 사용자 정의:
RejectedExecutionHandler를 상속 받아 구현
1
| ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy());
|
참고