생산자 소비자 문제
생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로, 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다.
기본 개념
생산자(Producer)
: 데이터를 생성하는 역할을 한다. 예를 들어, 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드가 생산자 역할을 할 수 있다.소비자(Consumer)
: 생성된 데이터를 사용하는 역할을 한다. 예를 들어, 데이터를 처리하거나 저장하는 스레드가 소비자 역할을 한다.버퍼(Buffer)
: 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고받는다.
문제 상황
- 생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
- 소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.
생산자 소비자 문제
생산자 소비자 문제(producer-consumer problem)
: 생산자 소비자 문제는, 생산자 스레드와 소비자 스레드가 특정 자원을 함께 생산하고, 소비하면서 발생하는 문제이다.한정된 버퍼 문제(bounded-buffer problem)
: 이 문제는 결국 중간에 있는 버퍼의 크기가 한정되어 있기 때문에 발생한다.
생산자 소비자 문제 예제 1코드
BoundedQueue
public class BoundedQueueV1 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV1(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
if (queue.size() == max) {
MyLogger.log("[put] 큐가 가득참, 버림: " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if (queue.isEmpty()) {
return null;
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
ConsumerTask
public class ConsumerTask implements Runnable {
private BoundedQueue queue;
public ConsumerTask(BoundedQueue queue) {
this.queue = queue;
}
@Override
public void run() {
log("[소비 시도] ? <- " + queue);
String data = queue.take();
log("[소비 완료] " + data + " <- " + queue);
}
}
ProducerTask
public class ProducerTask implements Runnable {
private BoundedQueue queue;
private String request;
public ProducerTask(BoundedQueue queue, String request) {
this.queue = queue;
this.request = request;
}
@Override
public void run() {
log("[생산 시도] " + request + " -> " + queue);
queue.put(request);
log("[생산 완료] " + request + " -> " + queue);
}
}
BoundedMain
public class BoundedMain {
public static void main(String[] args) {
// 1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV1(2);
// 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
producerFirst(queue); // 생산자 먼저 실행
// consumerFirst(queue); // 소비자 먼저 실행
}
private static void producerFirst(BoundedQueue queue) {
log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
List<Thread> threads = new ArrayList<>();
startProducer(queue, threads);
printAllState(queue, threads);
startConsumer(queue, threads);
printAllState(queue, threads);
log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
private static void consumerFirst(BoundedQueue queue) {
log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
List<Thread> threads = new ArrayList<>();
startConsumer(queue, threads);
printAllState(queue, threads);
startProducer(queue, threads);
printAllState(queue, threads);
log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
private static void printAllState(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("현재 상태 출력, 큐 데이터: " + queue);
threads.forEach(thread -> log(thread.getName() + ": " + thread.getState()));
}
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
threads.add(producer);
producer.start();
sleep(100); // 걸어준 이유는, 3번 실행될건데 한번에 실행되면 순서가 바뀔 수도 있으니 1 -> 2 -> 3 수행을 보장하기 위해!
}
}
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("소비자 시작");
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
threads.add(consumer);
consumer.start();
sleep(100); // 걸어준 이유는, 3번 실행될건데 한번에 실행되면 순서가 바뀔 수도 있으니 1 -> 2 -> 3 수행을 보장하기 위해!
}
}
}
문제점
- 생산자 스레드 먼저 실행의 경우 p3가 보관하는 data3은 버려지고, c3는 데이터를 받지 못한다. (
null
을 받는다.) - 소비자 스레드 먼저 실행의 경우 c1, c2, c3는 데이터를 받지 못한다.(
null
을 받는다.) 그리고 p3가 보관하는 data3은 버려진다. - 버퍼가 가득 찬 경우: 생산자 입장에서 버퍼에 여유가 생길 때 까지 조금만 기다리면 되는데, 기다리지 못하고, 데이터를 버리는 것은 아쉽다.
- 버퍼가 빈 경우: 소비자 입장에서 버퍼에 데이터가 채워질 때 까지 조금만 기다리면 되는데, 기다리지 못하고,
null
데이터를 얻는 것은 아쉽다.
→ 그렇다면 스레드가 기다리도록 변경해보자!
생산자 소비자 문제 - 예제2 코드(while문을 통한 대기 조건 추가)
이번에는 각 상황에 맞추어 스레드가 기다리도록 해보자
BoundedQueueV2
public class BoundedQueueV2 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV2(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득참, 생산자 대기");
sleep(1000);
}
queue.offer(data);
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
sleep(1000);
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
put(data)
- 데이터를 버리지 않는 대안- 생산자 스레드가 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크한다.
- 만약 빈 공간이 없다면 sleep()을 사용해서 잠시 대기하고, 깨어난 다음 다시 반복문에서 큐의 빈 공간을 체크하는 식으로 구현했다
- 언젠가는 소비자 스레드가 실행되어서 큐의 데이터를 가져갈 것이라는 가정이 존재한다.
take()
- 큐에 데이터가 없다면 기다리자- 소비자 스레드가 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다.
- 만약 데이터가 없다면 sleep()을 사용해서 잠시 대기하고, 깨어난 다음 다시 반복문에서 큐에 데이터가 있는지 체크하는 식으로 구현했다
- 언젠가는 생산자 스레드가 실행되어서 큐에 데이터를 추가할 것이라는 가정이 존재한다.
생산자 소비자 문제 - 예제2 분석
문제는 while문에 있다.
생산자 먼저 실행 분석
- p3가 락을 획득한 후, 큐가 가득차 있으니
sleep(1000)
을 사용해서RUNNABLE → TIMED_WAITING
상태가 된다 - 이때, 반복문을 사용해서 1초마다 큐에 빈 자리가 있는지 반복해서 확인한다.
- 핵심은 p3 스레드가 락을 가지고 있는 상태에서,큐에 빈 자리가 나올 때까지 대기한다는 점이다.
- 이후 소비자 스레드는 당연히 락이 없기에 모두
BLOCKED
상태가 된다.
소비자 먼저 실행 분석
- 이 경우, c1이 시작부터 데이터가 없으니 락을 가진 상태로 계속 반복문을 돌고 있어서 문제이다.
- c1이 계속 락을 점유하고 있기 때문에, 이후의 모든 스레드들은 모두
BLOCKED
상태가 된다.
무한 대기 문제(sleep 시 락 점유 문제)
결국 임계 영역 안에서 락을 가지고 대기하는 것이 문제이다.
그렇다면 sleep()
을 호출해서 잠시 대기할 때 아무일도 하지 않는데, 이때 대기하는 동안 잠시 다른 스레드에게 락을 양보하면 어떨까?
그러면 다른 스레드가 버퍼에 값을 채우거나 버퍼의 값을 가져갈 수 있을 것이다. 그러면 락을 가진 스레드도 버퍼에서 값을 획득하거나 값을 채우고 락을 반납할 수 있을 것이다.
→ 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다면, 이 문제를 쉽게 풀 수 있다
→ 자바의 Object.wait()
, Object.notify()
를 사용하면 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다.
Object - wait, notify
wait(), notify() 설명
Object.wait()
- 현재 스레드가 가진 락을 반납하고 다시
WAITING
한다. - 현재 스레드를
WAITING
상태로 전환하는 메서드이므로, 이 메서드는 현재 스레드가synchronized
블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다. - 호출한 스레드는 락을 반납하고, 다른 스레드가 해당 락을 획득할 수 있도록 한다.
- 이렇게 대기 상태로 전환된 스레드는 다른 스레드가
notify()
또는notifyAll()
을 호출할 때까지 대기 상태를 유지한다.
- 현재 스레드가 가진 락을 반납하고 다시
Object.notify()
- 대기 중인 스레드 중 하나를 깨운다.
- 이 메서드는
synchronized
블록이나 메서드에서 호출되어야 한다. 깨운 스레드는 락을 다시 획득할 기회를 얻게 된다. - 만약 대기 중인 스레드가 여러 개라면, 그 중 하나만 깨워진다.
Object.notifyAll()
- 대기 중인 모든 스레드를 깨운다.
- 이 메서드 역시
synchronized
블록이나 메서드에서 호출되어야 한다. - 이 방법은 모든 스레드를 깨워야 할 필요가 있는 경우에 유용하다.
생산자 소비자 문제 - 예제 3 코드 (sleep 대신 wait & notify 추가)
BoundedQueueV3
public class BoundedQueueV3 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV3(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득참, 생산자 대기");
try {
wait(); // RUNNABLE -> WAITING
log("[put] 생산자 깨어남!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
return data;
}
@Override
public String toString() {
return queue.toString();
}
}
- 앞서 작성한
sleep()
코드 대신Object.wait()
를 사용했다. put(data) - wait(), notify()
- 락을 획득한 생산자 스레드는 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크한다.
- 만약 빈 공간이 없다면
Object.wait()
을 사용해서 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐의 빈 공간을 체크한다. - 생산자가 데이터를 큐에 저장하고 나면
notify()
를 통해 저장된 데이터가 있다고 대기하는 스레드에게 알려주어야 한다!
take() - wait(), notify()
- 락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다.
- 만약 데이터가 없다면
Object.wait()
을 사용해서 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다. - 소비자가 데이터를 획득하고 나면
notify()
를 통해 큐에 저장할 여유 공간이 생겼다고 대기하는 스레드에게 알려주어야 한다!
wait()
로 대기상태에 빠진 스레드는 notify()
를 사용해야 깨울 수 있다. 핵심은 wait()
를 호출해서 대기 상태에 빠질 때, 락을 반납하고 대기 상태에 빠진다는 것이다. 대기 상태에 빠지면 어차피 아무일도 하지 않으므로 락도 필요하지 않다.
중요한 것은 대기 집합에서 깨어난 스레드도 일단은 임계 영역 안에 있다는 것이다. 임계 영역에 있는 코드를 실행하려면 먼저 락을 획득해야 하는데, 아직 notify()
를 호출한 스레드 쪽의 작업이 끝나지 않았으므로 깨어난 스레드는 WAITING → BLOCKED
된다.
이후, 호출한 스레드의 작업이 끝나 락을 반납하면, 깨어난 스레드가 락을 이어 받아서 작업을 수행한다.
정리
소비자 우선 실행 시, 소비자인 c1이 같은 소비자인 c2, c3를 깨울 수 있었다. 이 경우 큐에 데이터가 없을 가능성이 있다. 이때는 깨어난 소비자 스레드가 CPU 자원만 소모하고 다시 대기 집합에 들어가기 떄문에 비효율적이다.
만약 소비자인 c1 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 하지만, notify()
는 이런 선택을 할 수 없다..
물론 비효율적이라는 것이지 결과에는 아무 문제없다.
Object - wait, notify 한계
Object.wait()
, Object.notify()
방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify()
를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같 은 생산자를 깨우는 비효율도 발생할 수 있다.
→ 같은 종류의 스레드를 깨울 때 비효율이 발생한다.
스레드 기아(thread starvation)
notify()
의 또 다른 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 발생할 수 있는 스레드 기아 문제가 있다. 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 아주 나중에 깨어나거나 실행되지 못하는 상황이 생길 수 있으며, 실행조차 되지 못하는 상황을 스레드 기아(starvation)
상태라 한다.
실제로 자바에서는 이렇게 구현되어있지는 않다. 최대한 스레드 기아 상태가 발생하지 않도록 내부적으로 잘 구현되어 있다. 하지만 이게 해결책이 되지는 못한다는 것을 알아두자.
이런 스레드 기아 상태를 해결하기 위해 notify()
대신 notifyAll()
을 사용할 수 있다.
결과적으로 notifyAll()
을 사용해서 스레드 기아 문제는 막을 수 있지만.. 비효율을 막지는 못한다.
Lock Condition
위에서 발생한 비효율 문제를 해결하기 위해서는 생산자 스레드가 대기하는 대기 집합과, 소비자 스레드가 대기하는 대기 집합을 둘로 나누면 된다.
그리고 생산자 스레드가 데이터를 생산하면 소비자 스레드가 대기하는 대기 집합에만 알려주고, 소비자 스레드가 데이터를 소비하면 생산자 스레드가 대기하는 대기 집합에만 열려주면 되는 것이다.
= 생산자용, 소비자용 대기 집합을 서로 나누어 분리하면 비효율 문제를 깔끔하게 해결할 수 있다
이를 위해서 Lock
, ReentrantLock
을 사용할 수 있다.
Condition
- 설명
Condition
은ReentrantLock
을 사용하는 스레드가 대기하는 스레드 대기 공간이다.lock.newCondition()
을 통해 스레드 대기 공간을 만들 수 있다.Condition condition = lock.newCondition()
Object.wait()
에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있다. 반면,Lock(ReentrantLock)
을 사용하는 경우, 이렇게 스레드 대기 공간을 직접 만들어서 사용해야 한다.
- 메서드
condition.await()
:Object.wait()
와 유사한 기능으로, 지정한 condition에 현재 스레드를 락을 반납하고 대기(WAITING
) 상태로 보관한다.condition.signal()
:Object.notify()
와 유사한 기능으로, 지정한 condition에서 대기 중인 스레드를 하나 깨운다. 깨어난 스레드는 condition에서 빠져나온다.
생산자 소비자 대기 공간 분리 - 예제 5 코드(ReentrantLock & Condition 사용)
BoundedQueueV4
public class BoundedQueueV4 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득참, 생산자 대기");
try {
producerCond.await(); // Lock 인터페이스에서 제공하는 wait() 기능
log("[put] 생산자 깨어남!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
consumerCond.signal(); // Lock 인터페이스에서 제공하는 notify() 기능
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
핵심은 생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다는 점이다!
Object.notify() vs Condition.signal()
Object.notify()
- 대기 중인 스레드 중 임의의 하나를 선택해서 깨운다. 스레드가 깨어나는 순서는 정의되어 있지 않으며, JVM 구현에 따라 다르다. 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있다.
synchronized
블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 한다.
Condition.signal()
- 대기 중인 스레드 중 하나를 깨우며, 일반적으로는
FIFO
순서로 깨운다. 이 부분은 자바 버전과 구현에 따라 달라질 수 있지만, 보통Condition
의 구현은Queue
구조를 사용하기 때문에 FIFO 순서로 깨운다. ReentrantLock
을 가지고 있는 스레드가 호출해야 한다.
- 대기 중인 스레드 중 하나를 깨우며, 일반적으로는
스레드의 대기
락 대기 집합
- 락 대기 집합은 자바 내부에 구현되어 있어서 모니터 락과 같이 개발자가 확인하기는 어렵다
BLOCKED
상태의 스레드들이 관리된다.- 언젠가 락을 소유 중인 스레드가 락을 반납하면 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득한다.
개념상 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소이다.
비유를 하자면 임계 영역을 안전하게 지키기 위한 2중 감옥인 것이다. 스레드는 2중 감옥을 모두 탈출해야 임계 영역을 수행할 수 있다.
정리
자바의 모든 객체 인스턴스는 멀티스레드와 임계 영역을 다루기 위해 내부에 3가지 기본 요소를 가진다.
- 모니터 락
- 락 대기 집합(모니터 락 대기 집합) → 1차 대기소
- 스레드 대기 집합 → 2차 대기소
2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있다.
RUNNABLE
→ {BLOCKED
(다른 스레드 들은)} → WAITING
(wait()
호출), 모니터 락 반납 → 다른 스레드가 notify()
호출 → 스레드 대기 잡합 빠져나오면서 모니터 락 획득 시도(RUNNABLE
) → 바로 획득하지 못함(BLOCKED
) → 획득(RUNNABLE
)
synchronized vs ReentrantLock 대기
synchronized
와 마찬가지로 Lock(ReentrantLock)
도 2가지 단계의 대기 상태가 존재한다. 둘다 같은 개념을 구현한 것이기 때문에 비슷하다
synchronized
대기- 대기1: 모니터 락 획득 대기
- 자바 객체 내부의 락 대기 집합(모니터 락 대기 집합)에서 관리
BLOCKED
상태로 대기synchronized
를 시작할 때 락이 없으면 대기
- 대기2:
wait()
대기wait()
를 호출했을 때 자바 객체 내부의 스레드 대기 집합에서 관리WAITING
상태로 대기- 다른 스레드가
notify()
를 호출했을 때 스레드 대기 집합을 빠져나감
- 대기1: 모니터 락 획득 대기
- Lock(ReentrantLock)
- 대기1:
ReentrantLock
락 획득 대기ReentrantLock
의 대기 큐에서 관리WAITING
상태로 대기lock.lock()
을 호출했을 때 락이 없으면 대기
- 대기2:
await()
대기condition.await()
를 호출했을 때,condition
객체의 스레드 대기 공간에서 관리WAITING
상태로 대기- 다른 스레드가
condition.signal()
을 호출했을 때 condition 객체의 스레드 대기 공간에서 빠져나감
- 대기1:
BlockingQueue
우리가 완성한 BoundedQueueV4
는 생산자 소비자 문제, 또는 한정된 버퍼라고 알려진 문제를 매우 효율적으로 해결할 수 있는 자료구조이다.
만약, 멀티스레드 상황에서 생산자 소비자 문제가 나타난다면 우리가 만든 BoundedQueueV4
를 사용하면 된다. 하지만, 이는 이미 다 만들어져 있다.
BlockingQueue
자바는 생산자 소비자 문제, 또는 한정된 버퍼라고 불리는 문제를 해결하기 위해 java.util.concurrent.**BlockingQueue**
라는 인터페이스와 구현체들을 제공한다.
- 데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업(
put()
)을 시도하는 스레드는 공간이 생길 때까지 차단된다. - 데이터 획득 차단: 큐가 비어 있으면 획득 작업(
take()
)을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.
BlockingQueue 인터페이스와 대표적인 구현체
BlockingQueue
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
//...
}
- 대표적인 구현체
ArrayBlockingQueue
: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.LinkedBlockingQueue
: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 또는 무한하게 사용할 수도 있다.
BlockingQueue 사용 예제6
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
BlockingQueue - 기능 설명
- 즉시 반환
BlockingQueue
의offer(data)
,poll()
를 사용해서 스레드를 대기하지 않고 즉시 반환할 수 있다.- 두 메서드는 스레드가 대기하지 않는다
offer(data)
는 성공하면true
를 반환하고, 버퍼가 가득 차면 즉시false
를 반환한다.poll()
는 버퍼에 데이터가 없으면 즉시null
을 반환한다.
- 시간 대기
BlockingQueue
의offer(data, 시간)
,poll(시간)
를 사용해서, 특정 시간 만큼만 대기하도록 할 수 있다.offer(data, 시간)
는 성공하면true
를 반환하고, 버퍼가 가득 차서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면false
를 반환한다.- 여기서는 확인을 목적으로 1 나노초(
NANOSECONDS
)로 설정했다.
- 여기서는 확인을 목적으로 1 나노초(
poll(시간)
버퍼에 데이터가 없어서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면null
을 반환한다.- 여기서는 2초(
SECONDS
)로 설정했다.
- 여기서는 2초(
- 예외
BlockingQueue
의add(data)
,remove()
를 사용해서, 대기시 예외가 발생하도록 할 수 있다.add(data)
는 성공하면true
를 반환하고, 버퍼가 가득 차면 즉시 예외가 발생한다.java.lang.IllegalStateException: Queue full
remove()
는 버퍼에 데이터가 없으면, 즉시 예외가 발생한다java.util.NoSuchElementException
'Java' 카테고리의 다른 글
[Java] 동시성 컬렉션 (0) | 2025.01.06 |
---|---|
[Java] CAS - 동기화와 원자적 연산 (1) | 2025.01.06 |
[Java] 스레드 동시성 문제 (0) | 2025.01.06 |
[Java] 스레드의 생성 / 실행 / 제어 / 생명 주기 (0) | 2025.01.06 |
[Java] Optional (0) | 2025.01.05 |