포스트

병렬 스트림

병렬 스트림

스레드 풀 사용

  • 1~8을 병렬로 더하는 프로그램
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ParallelMain3 {

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    // 스레드 풀을 준비한다.
    ExecutorService es = Executors.newFixedThreadPool(2);

    long startTime = System.currentTimeMillis();

    // 1. Fork 작업을 분할한다.
    SumTask task1 = new SumTask(1, 4);
    SumTask task2 = new SumTask(5, 8);

    // 2. 분할한 작업을 처리한다.
    Future<Integer> future1 = es.submit(task1);
    Future<Integer> future2 = es.submit(task2);

    // 3. join - 처리한 결과를 합친다. get: 결과가 나올 때 까지 대기한다.
    Integer result1 = future1.get();
    Integer result2 = future2.get();
    log("main 스레드 대기 완료");

    int sum = result1 + result2;
    long endTime = System.currentTimeMillis();
    log("time: " + (endTime - startTime) + "ms, sum: " + sum);

    es.close();
  }

  static class SumTask implements Callable<Integer> {
    int startValue;
    int endValue;

    public SumTask(int startValue, int endValue) {
      this.startValue = startValue;
      this.endValue = endValue;
    }

    @Override
    public Integer call() {
      log("작업 시작");
      int sum = 0;
      for (int i = startValue; i <= endValue; i++) {
        int calculated = HeavyJob.heavyTask(i);
        sum += calculated;
      }
      log("작업 완료 result = " + sum);
      return sum;
    }

  }
}

Fork/Join 패턴

분할(Fork) → 처리(Execute) → 모음(Join)의 단계로 이루어진 멀티스레딩 패턴을 Fork/Join 패턴이라고 합니다.

Fork/Join 프레임워크

분할 정복(Divide and Conquer) 전략

  • 큰 작업(Task)을 작은 단위로 재귀적으로 분할(Fork)
  • 각 작은 작업의 결과를 합쳐(Join) 최종 결과를 생성
  • 멀티코어 환경에서 작업으 효율적으로 분산 처리

작업 훔치기(Work Stealing) 알고리즘

  • 각 스레드는 자신의 작업 큐를 가짐
  • 작업이 없는 스레드는 다른 바쁜 스레드의 큐에서 작업을 훔쳐와서 대신 처리
  • 부하 균형을 자동으로 조절하여 효울성 향상

주요 클래스

ForkJoinPool

  • Fork/Join 작업을 실행하는 특수한 ExecutorService 스레드 풀
  • 작업 스케줄링 및 스레드 관리를 담당
  • 기본적으로 사용 가능한 프로세서 수 만큼 스레드 생성

ForkJoinTask

  • ForkJoinTask는 Fork/Join 작업의 기본 추상 클래스
  • Future를 구현
  • 개발자는 주로 다음 두 하위 클래스를 구현해서 사용
    • RecursiveTask<V>: 결과를 반환하는 작업
    • RecursiveAction: 결과를 반환하지 않는 작업 (void)

RecursivceTask/RecursiceAction의 구현 방법

  • compute() 메서드를 재정의해서 필요한 작업 로직을 작성
  • 일반적으로 일정 기준(임계값)을 두고 작업 범위가 작으면 직업 처리하고, 클면 작업을 둘로 분할하여 각각 병렬로 처리하도록 구현

fork()/join() 메서드

  • fork(): 현재 스레드에서 다른 스레드로 작업을 분할하여 보내는 동작
  • join(): 분할된 작업이 끝날 때까지 기다린 후 결과를 가져오는 동작

코드 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class SumTask extends RecursiveTask<Integer> {
  //private static final int THRESHOLD = 4; // 임계값
  private static final int THRESHOLD = 2; // 임계값 변경

  private final List<Integer> list;

  public SumTask(List<Integer> list) {
    this.list = list;
  }

  @Override
  protected Integer compute() {
    // 작업 범위가 작으면 직접 계산
    if (list.size() <= THRESHOLD) {
      log("[처리 시작] " + list);
      int sum = list.stream()
        .mapToInt(HeavyJob::heavyTask)
        .sum();
      log("[처리 완료] " + list + " -> sum: " + sum);
      return sum;
    } else {
      // 작업 범위가 크면 반으로 나누어 병렬 처리
      int mid = list.size() / 2;
      List<Integer> leftList = list.subList(0, mid);
      List<Integer> rightList = list.subList(mid, list.size());
      log("[분할] " + list + " -> LEFT" + leftList + ", RIGHT" + rightList);

      SumTask leftTask = new SumTask(leftList);
      SumTask rightTask = new SumTask(rightList);

      // 왼쪽 작업은 다른 스레드에서 처리
      leftTask.fork();
      // 오른쪽 작업은 현재 스레드에서 처리
      Integer rightResult = rightTask.compute();//[5 ~ 8] -> 260

      // 왼쪽 작업 결과를 기다림
      Integer leftResult = leftTask.join();
      int joinSum = leftResult + rightResult;
      log("LEFT[ + " + leftResult + "] + RIGHT[" + rightResult + "] -> sum:" + joinSum);
      return joinSum;
    }
  }
}

  • ForkJoinPool 직접 생성하는 방식
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ForkJoinMain1 {

  public static void main(String[] args) {
    List<Integer> data = IntStream.rangeClosed(1, 8)
      .boxed()
      .toList();

    log("[생성] " + data);

    // ForkJoinPool 생성 및 작업 수행
    long startTime = System.currentTimeMillis();
    ForkJoinPool pool = new ForkJoinPool(10);
    SumTask task = new SumTask(data); // [1 ~ 8]

    // 병렬로 합을 구한 후 결과 출력
    Integer result = pool.invoke(task);
    pool.close();
    long endTime = System.currentTimeMillis();
    log("time: " + (endTime - startTime) + "ms, sum: " + result);
    log("pool: " + pool);
  }
}
  • ForkJoinPool 공용 풀 사용하는 방식
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ForkJoinMain2 {

  public static void main(String[] args) {
    int processorCount = Runtime.getRuntime().availableProcessors();
    ForkJoinPool commonPool = ForkJoinPool.commonPool();
    log("processorCount = " + processorCount + ", commonPool = " + commonPool.getParallelism());

    List<Integer> data = IntStream.rangeClosed(1, 8)
      .boxed()
      .toList();

    log("[생성] " + data);
    SumTask task = new SumTask(data);
    Integer result = task.invoke(); // 공용 풀 사용
    log("최종 결과: " + result);
  }
}

작업 훔치기

  • Fork/Join 풀의 스레드는 각자 자신의 작업 큐를 가짐
    • 덕분에 작업을 큐에서 가져가기 위한 스레드간 경합이 줄어듦
  • 자신의 작업이 없는 경우, 다른 스레드의 작업 큐에 대기중인 작업을 훔쳐서 대신 처리

Fork/Join 공용 풀

자바 8에 Fork/Join 작업을 위한 공용 풀이라는 개념이 도입되었습니다.

특징

  • 시스템 전체에서 공유: 애플리케이션 내에서 단일 인스턴스로 공유
  • 자동 생성: 별도로 생성하지 않아도 ForkJoinPool.commonPool()을 통해 접근 할 수 있음
  • 편리한 사용: 별도의 풀을 만들지 않아도 RecursiveTask/RecursiveAction 을 사용할 때 기본적으로 공용 풀이 사용 됨
  • 병렬 스트림 활용: 자바 8의 병렬 스트림은 내부적으로 공용 풀을 사용 함
  • 자원 효율성: 여러 곳에서 별도의 풀을 생성하는 대신 공용 풀을 사용함으로써 시스템 자원을 효율적으로 관리할 수 있음
  • 병렬 수준 자동 설정: 기본적으로 시스템 가용 프로세서 수에서 1을 뺀 값으로 병렬 수준을 설정

자바 병렬 스트림

  • .parallel() 한 줄 추가하면 병렬 스트림 완성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ParallelMain4 {

  public static void main(String[] args) {
    int processorCount = Runtime.getRuntime().availableProcessors();
    ForkJoinPool commonPool = ForkJoinPool.commonPool();
    log("processorCount = " + processorCount + ", commonPool = " + commonPool.getParallelism());

    long startTime = System.currentTimeMillis();

    int sum = IntStream.rangeClosed(1, 8)
      .parallel() // 추가
      .map(HeavyJob::heavyTask)
      .reduce(0, (a, b) -> a + b); // sum();

    long endTime = System.currentTimeMillis();
    log("time: " + (endTime - startTime) + "ms, sum: " + sum);
  }
}

병렬 스트림 주의점

  • CPU바운드 작업에만 사용하는 것을 권장
  • I/O 바운드 작업은 오랜 대기 시간이 발생하므로, 제한된 스레드만 쓰는 Fork/Join 공용 풀과 궁합이 좋지 않음
  • 서버 환경에서 여러 요청이 동시에 병렬 스트림을 사용하면 공용 풀이 빠르게 포화되어 전체 성능이 저하 될 수 있음

별도의 풀 사용

  • I/O 바운드 작업 처럼 대기가 긴 경우에는 전용 스레드 풀을 만들어 사용하는 것을 권장
  • 스레드 풀의 크기, 스레드 생성 정책, 큐 타입 등을 상황에 맞게 튜닝할 수 있어 확장성과 안정성이 높아짐

CompletableFuture 사용 시 주의 상황

  • CompletableFuture를 생성할 때 별도의 스레드를 지정하지 않으면 Fork/Join 공용 풀이 사용 됨
  • 반드시 커스텀 풀을 지정해서 사용해야 함
1
2
3
4
5
6
7
8
9
10
11
public class CompletableFutureMain {

  public static void main(String[] args) throws InterruptedException {
    CompletableFuture.runAsync(() -> log("Fork/Join")); // Fork/Join 공용 풀

    ExecutorService es = Executors.newFixedThreadPool(100);
    CompletableFuture.runAsync(() -> log("Custom Pool"), es); // 별도의 풀
    Thread.sleep(100);
    es.close();
  }
}

참고

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.