포스트

생산자 소비자 문제

생산자 소비자 문제

기본 개념

  • 생산자(Producer): 데이터를 생성하는 역할
  • 소비자(Consumer): 생성된 데이터를 사용하는 역할
  • 버퍼(Buffer): 생성자가 생성한 데이터를 일시적으로 저장한느 공간

문제 상황

  • 생산자가 너무 빠를 때: 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 함
  • 소비자가 너무 빠를 때: 버퍼가 비어 있을 때 소비자는 버퍼에 새로운 데이터가 들어올때까지 기다려야 함

예제

예제 1 - 대기 없이 버퍼에 삽입 및 조회

1
2
3
4
5
6
public interface BoundedQueue {

  void put(String data);

  String take();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BoundedQueueV1 implements BoundedQueue {

  // 버퍼
  private final Queue<String> queue = new ArrayDeque<>();

  // 버퍼 사이즈
  private final int max;

  public BoundedQueueV1(int max) {
    this.max = max;
  }

  @Override
  public synchronized void put(String data) {
    if (queue.size() == max) {
      log("[put] 큐가 가득 참, 버림: " + data);
      return;
    }
    queue.offer(data);
  }

  @Override
  public synchronized String take() {
    if (queue.isEmpty()) {
      return null;
    }

    return queue.poll();
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConsumerTask implements Runnable {

  private BoundedQueue queue;

  public ConsumerTask(BoundedQueue queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    log("[소비 시도]     ? <- " + queue);
    String data = queue.take();
    log("[소비 완료] " + data + " <- " + queue);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ProducerTask implements Runnable {

  private BoundedQueue queue;
  private String request;

  public ProducerTask(BoundedQueue queue, String request) {
    this.queue = queue;
    this.request = request;
  }

  @Override
  public void run() {
    log("[생산 시도] " + request + " -> " + queue);
    queue.put(request);
    log("[생산 완료] " + request + " -> " + queue);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class BoundedMain {

  public static void main(String[] args) {

    // 1. BoundedQueue 선택
    BoundedQueue queue = new BoundedQueueV1(2);

    // 2. 생상자, 소비자 실행 순서 선택, 반드시 하나만 선택
    producerFirst(queue); // 생산자 먼저 실행
    //                consumerFirst(queue); // 소비자 먼저 실행
  }

  private static void producerFirst(BoundedQueue queue) {
    log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");

    List<Thread> threads = new ArrayList<>();
    startProducer(queue, threads);
    printAllState(queue, threads);
    startConsumer(queue, threads);
    printAllState(queue, threads);

    log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
  }

  private static void consumerFirst(BoundedQueue queue) {
    log("== [소비 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");

    List<Thread> threads = new ArrayList<>();
    startConsumer(queue, threads);
    printAllState(queue, threads);
    startProducer(queue, threads);
    printAllState(queue, threads);

    log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
  }

  private static void startProducer(BoundedQueue queue, List<Thread> threads) {
    System.out.println();
    log("생산자 시작");
    for (int i = 1; i <= 3; i++) {
      Thread thread = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
      threads.add(thread);
      thread.start();
      sleep(100);
    }
  }

  private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
    System.out.println();
    log("소비자 시작");
    for (int i = 1; i <= 3; i++) {
      Thread thread = new Thread(new ConsumerTask(queue), "consumer" + i);
      threads.add(thread);
      thread.start();
      sleep(100);
    }
  }

  private static void printAllState(BoundedQueue queue, List<Thread> threads) {
    System.out.println();

생성자 스레드 3개 소비자 스레드 3개를 생성하여 데이터를 삽입하고 조회 하는 코드입니다.

우선 생성자 스레드 3개가 데이터를 삽입 시도를 하면 버퍼 사이즈가 2개이기 때문에 마지막 데이터를 삽입에 실패하게 됩니다.

이후 소비자 스레드 3개가 데이터를 조회하면 버퍼에 있는 데이터 2개만 조회가 되어 마지막 조회는 실패하게 됩니다.

예제 2 - 대기하여 데이터 삽입 및 조회 (실패 버전)

만약 생성자는 버퍼가 다 찼다면 잠시 대기했다가 버퍼에 자리가 나면 데이터를 삽입하고, 소비자는 데이터가 없으면 잠시 대기했다가 데이터가 들어오면 조회를 하도록 수정하면 데이터 유실이 없어질 것입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BoundedQueueV2 implements BoundedQueue {

  private final Queue<String> queue = new ArrayDeque<>();
  private final int max;

  public BoundedQueueV2(int max) {
    this.max = max;
  }

  @Override
  public synchronized void put(String data) {
    while (queue.size() == max) {
      log("[put] 큐가 가득 참, 생산자 대기");
      sleep(1000);
    }
    queue.offer(data);
  }

  @Override
  public synchronized String take() {
    while (queue.isEmpty()) {
      log("[take] 큐에 데이터가 없음, 소비자 대기");
      sleep(1000);
    }
    return queue.poll();
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}

  • 생상자: 버퍼 사이즈를 확인하여 없으면 1초 쉬고 다시 버퍼 사이즈를 확인하여 자리가 날때 까지 반복하다가 자리가 나면 데이터 삽입
  • 소비자: 버퍼 사이즈를 확인하여 비어있으면 1초 쉬고 데이터가 있을 때 까지 확인을 반복하다가 데이터가 들어오면 조회

생각한대로 된다면 좋겠지만 위 코드는 상상한대로 동작하지 않습니다.

이유는 버퍼를 확인하는 단계가 synchronized 안에 있기 때문에 락을 계속 쥐고 있어 다른 스레드가 접근을 못하기 때문에 버퍼에 데이터의 변화가 일어나지 않기 때문입니다.

예제 3 - Object - wait, notify

그렇다면 쥐고 있는 락을 잠시 양보할 순 없을까? 하는 생각이 들 수 있습니다.

바로 Objectwait(), nofify()를 이용하면 락을 양보하고 다시 획득 할 수 있습니다.

  • Object.wait()
    • 현재 스레드가 가진 락을 반납하고 WAITING 상태로 변경
    • 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출 가능
    • 다른 스레드가 notify() 또는 notifyAll()을 호출 할 때 까지 대기
  • Object.notify()
    • 대기 중인 스레드 중 하나를 깨움
    • 이 메서드는 synchronized 블록이나 메서드에서 호출 되어야 함
    • 깨운 스레드는 BLOCKED 상태로 전환되며 다시 락을 획득할 기회를 얻게 됨
  • Object.notifyAll()
    • 대기 중인 모든 스레드를 깨움
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class BoundedQueueV3 implements BoundedQueue {

  private final Queue<String> queue = new ArrayDeque<>();
  private final int max;

  public BoundedQueueV3(int max) {
    this.max = max;
  }

  @Override
  public synchronized void put(String data) {
    while (queue.size() == max) {
      log("[put] 큐가 가득 참, 생산자 대기");
      try {
        wait(); // RUNNABLE -> WAITING, 락 반납
        log("[put] 생산자 깨어남");
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
    queue.offer(data);
    log("[put] 생산자 데이터 저장, notify() 호출");
    notify(); // 대기 스레드, WAIT -> BLOCKED
  }

  @Override
  public synchronized String take() {
    while (queue.isEmpty()) {
      log("[take] 큐에 데이터가 없음, 소비자 대기");
      try {
        wait();
        log("[take] 소비자 깨어남");
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
    String data = queue.poll();
    log("[take] 소비자 데이터 획득, notify() 호출");
    notify(); // 대기 스레드, WAIT -> BLOCKED
    return data;
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}

이제 상상한대로 생산자는 버퍼에 공간이 없으면 대기 하다가 데이터를 삽입하고, 소비자는 데이터가 없으면 대기하다가 데이터가 들어오면 조회하는 것을 볼 수 있습니다.

그림으로 원리 이해하기

생산자 스레드에서 데이터를 넣을려고 할 때, 공간이 없으면 wait() 를 호출합니다

이때, 락을 반납하고 스레드 대기 집합으로 들어갑니다.

소비자 스레드가 데이터를 조회하고 notify()를 실행하고 스레드를 종료하며 락을 반납합니다.

그럼 스레드 대기 집합에 있던 생산자 스레드가 BLOCKED 상태로 전환되어 락 획득을 시도합니다.

스레드 대기 집합(wait set)

  • synchronized 임계 영역 안에서 Object.wait()를 호출하면 WAITING 상태로 전환되는데 이렇게 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합이라고 함
  • 모든 객체는 락(모니터락)과 대기 집합을 가지고 있음

조금 더 개선해보자

지금은 생성자 소비자 모두 스레드 대기 집합 한 곳에 대기하게 됩니다. notify()를 실행하면 랜덤하게 깨우므로 불필요한 깨움이 일어날 수 있습니다.

예를 들어 소비자 스레드가 조회를 시도하고 notify()를 실행하여 다른 스레드를 깨웠는데 다시 소비자 스레드가 나오면 의미 없이 다시 스레드 집합으로 돌아가야 할 것입니다.

만약 생산자 스레드가 끝나면 소비자 스레드를 깨우고, 소비자 스레드가 끝나면 생산자 스레드를 깨울수 있다면 이런 불필요함이 사라질 것 입니다.

예제 4 - Lock Condition

Lock Condition을 사용하면 스레드 대기 집합을 나눠서 저장할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class BoundedQueueV5 implements BoundedQueue {

  private final Lock lock = new ReentrantLock();
  private final Condition producerCondition = lock.newCondition(); // 생산자 대기 집합
  private final Condition consumerCondition = lock.newCondition(); // 소비자 대기 집합

  private final Queue<String> queue = new ArrayDeque<>();
  private final int max;

  public BoundedQueueV5(int max) {
    this.max = max;
  }

  @Override
  public void put(String data) {
    lock.lock();
    try {
      while (queue.size() == max) {
        log("[put] 큐가 가득 참, 생산자 대기");
        try {
          producerCondition.await(); // 생산자 스레드 대기 집합에 저장
          log("[put] 생산자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      queue.offer(data);
      log("[put] 생산자 데이터 저장, notify() 호출");
      consumerCondition.signal(); // 소비자 스레드 대기 집합에서 깨우기
    } finally {
      lock.unlock();
    }

  }

  @Override
  public String take() {
    lock.lock();
    try {
      while (queue.isEmpty()) {
        log("[take] 큐에 데이터가 없음, 소비자 대기");
        try {

          consumerCondition.await(); // 소비자 스레드 대기 집합에 저장

          log("[take] 소비자 깨어남");
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      String data = queue.poll();
      log("[take] 소비자 데이터 획득, notify() 호출");
      producerCondition.signal(); // 생산자 스레드 대기 집합에서 깨우기
      return data;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public String toString() {
    return queue.toString();
  }
}

  • lock.newCondition() : 스레드 대기 집합 생성
  • condition.await(): 해당 스레드 대기 집합에 저장
  • condition.signal() : 해당 스레드 대기 집합에서 깨우기

그림으로 원리 이해하기

생산자 스레드가 데이터를 저장하고 소비자 스레드를 깨웁니다.

소비자 스레드가 조회를 하고 생산자 스레드를 깨웁니다.

스레드의 대기

모든 객체 인스턴스는 멀티스레드와 임계 영역을 다루기 위한 3가지 기본 요소를 가짐

  • 모니터 락
  • 락 대기 집합(모니터 락 대기 집합)
  • 스레드 대기 집합

락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소 입니다. 2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있습니다.

스레드 집합에서 스레드를 깨워도 락을 획득해야 하기 때문에 BLOCKED로 전환 됩니다.

BlockingQueue

자바는 생산자 소비자 문제, 또는 한정된 버퍼라고 불리는 문제를 해결하기 위해 java.util.concurrent.BlockingQueue 라는 인터페이스와 구현체들을 제공합니다.

  • 데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업(put())을 시도하는 스레드는 공간이 생길 때 까지 차단 됨
  • 데이터 획득 차단: 큐가 비어 있으면 획득 작업(take())을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단 됨
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface BlockingQueue<E> extends Queue<E> {
  boolean add(E e);

  boolean offer(E e);

  void put(E e) throws InterruptedException;

  boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

  E take() throws InterruptedException;

  E poll(long timeout, TimeUnit unit) throws InterruptedException;

  boolean remove(Object o);
  //...
}

기능 정리

OperationThrows ExceptionSpecial ValueBlocksTimes Out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()  
  • Throws Exception - 대기 시 예외
    • add(e)
      • 큐에 요소를 추가
      • 큐가 가득 차면 IllegalStateException 예외 발생
    • remove()
      • 큐에서 요소를 제거하고 반환
      • 큐가 비어 있으면 NoSuchElementException 예외 발생
    • element()
      • 큐의 머리 요소 반환 (제거 X)
      • 큐가 비어 있으면 NoSuchElementException 예외 발생
  • Special Value - 대기 시 즉시 반환
    • offer(e)
      • 큐에 요소를 추가
      • 큐가 가득차면 false 반환
    • poll()
      • 큐에서 요소를 제거하고 반환
      • 큐가 비어 있으면 null 반환
    • peek()
      • 큐에 머리 요소 반환 (제거 X)
      • 큐가 비어 있으면 null 반환
  • Blocks - 대기
    • put(e)
      • 큐에 요소를 저장
      • 큐가 가득차면 요소를 큐에 추가할 때 까지 대기
    • take()
      • 큐에서 요소를 제거하고 반환
      • 큐가 비어 있으면 요소가 준비 될 때까지 대기
  • Times Out - 시간 대기
    • offer(e, time, unit)
      • 큐에 요소를 저장
      • 지정된 시간 동안 큐가 비어지길 기다리다가 시간이 초과되면 false 반환
    • poll(time, unit)
      • 큐에서 요소를 제거하고 반환
      • 지정된 시간 동안 요소가 준비되기를 기다리다가 시간이 초과되면 null 반환

BlockingQueue의 모든 대기, 시간 대기 메서드는 인터럽트를 제공합니다.

참고

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.