[Java] 스레드 - 생산자 소비자 문제

2025. 1. 6. 22:57·Java
728x90
반응형

생산자 소비자 문제

생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로, 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다.

기본 개념

  • 생산자(Producer): 데이터를 생성하는 역할을 한다. 예를 들어, 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드가 생산자 역할을 할 수 있다.
  • 소비자(Consumer): 생성된 데이터를 사용하는 역할을 한다. 예를 들어, 데이터를 처리하거나 저장하는 스레드가 소비자 역할을 한다.
  • 버퍼(Buffer): 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고받는다.

문제 상황

  • 생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
  • 소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.

생산자 소비자 문제

  • 생산자 소비자 문제(producer-consumer problem): 생산자 소비자 문제는, 생산자 스레드와 소비자 스레드가 특정 자원을 함께 생산하고, 소비하면서 발생하는 문제이다.
  • 한정된 버퍼 문제(bounded-buffer problem): 이 문제는 결국 중간에 있는 버퍼의 크기가 한정되어 있기 때문에 발생한다.

생산자 소비자 문제 예제 1코드

BoundedQueue

public class BoundedQueueV1 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV1(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        if (queue.size() == max) {
            MyLogger.log("[put] 큐가 가득참, 버림: " + data);
            return;
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
        }

        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

ConsumerTask

public class ConsumerTask implements Runnable {
    private BoundedQueue queue;

    public ConsumerTask(BoundedQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        log("[소비 시도]     ? <- " + queue);
        String data = queue.take();
        log("[소비 완료] " + data + " <- " + queue);
    }
}

ProducerTask

public class ProducerTask implements Runnable {
    private BoundedQueue queue;
    private String request;

    public ProducerTask(BoundedQueue queue, String request) {
        this.queue = queue;
        this.request = request;
    }

    @Override
    public void run() {
        log("[생산 시도] " + request + " -> " + queue);
        queue.put(request);
        log("[생산 완료] " + request + " -> " + queue);
    }
}

BoundedMain

public class BoundedMain {

    public static void main(String[] args) {
        // 1. BoundedQueue 선택
        BoundedQueue queue = new BoundedQueueV1(2);

        // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
        producerFirst(queue); // 생산자 먼저 실행
//        consumerFirst(queue); // 소비자 먼저 실행
    }

    private static void producerFirst(BoundedQueue queue) {
        log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startProducer(queue, threads);
        printAllState(queue, threads);
        startConsumer(queue, threads);
        printAllState(queue, threads);
        log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void consumerFirst(BoundedQueue queue) {
        log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startConsumer(queue, threads);
        printAllState(queue, threads);
        startProducer(queue, threads);
        printAllState(queue, threads);
        log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void printAllState(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("현재 상태 출력, 큐 데이터: " + queue);
        threads.forEach(thread -> log(thread.getName() + ": " + thread.getState()));
    }

    private static void startProducer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("생산자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
            threads.add(producer);
            producer.start();
            sleep(100); // 걸어준 이유는, 3번 실행될건데 한번에 실행되면 순서가 바뀔 수도 있으니 1 -> 2 -> 3 수행을 보장하기 위해!
        }
    }

    private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("소비자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            sleep(100); // 걸어준 이유는, 3번 실행될건데 한번에 실행되면 순서가 바뀔 수도 있으니 1 -> 2 -> 3 수행을 보장하기 위해!
        }
    }
}

문제점

  • 생산자 스레드 먼저 실행의 경우 p3가 보관하는 data3은 버려지고, c3는 데이터를 받지 못한다. ( null을 받는다.)
  • 소비자 스레드 먼저 실행의 경우 c1, c2, c3는 데이터를 받지 못한다.(null을 받는다.) 그리고 p3가 보관하는 data3은 버려진다.
  • 버퍼가 가득 찬 경우: 생산자 입장에서 버퍼에 여유가 생길 때 까지 조금만 기다리면 되는데, 기다리지 못하고, 데이터를 버리는 것은 아쉽다.
  • 버퍼가 빈 경우: 소비자 입장에서 버퍼에 데이터가 채워질 때 까지 조금만 기다리면 되는데, 기다리지 못하고, null 데이터를 얻는 것은 아쉽다.

→ 그렇다면 스레드가 기다리도록 변경해보자!


생산자 소비자 문제 - 예제2 코드(while문을 통한 대기 조건 추가)

이번에는 각 상황에 맞추어 스레드가 기다리도록 해보자

 

BoundedQueueV2

public class BoundedQueueV2 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV2(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득참, 생산자 대기");
            sleep(1000);
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            sleep(1000);
        }

        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  • put(data) - 데이터를 버리지 않는 대안
    • 생산자 스레드가 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크한다.
    • 만약 빈 공간이 없다면 sleep()을 사용해서 잠시 대기하고, 깨어난 다음 다시 반복문에서 큐의 빈 공간을 체크하는 식으로 구현했다
    • 언젠가는 소비자 스레드가 실행되어서 큐의 데이터를 가져갈 것이라는 가정이 존재한다.
  • take() - 큐에 데이터가 없다면 기다리자
    • 소비자 스레드가 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다.
    • 만약 데이터가 없다면 sleep()을 사용해서 잠시 대기하고, 깨어난 다음 다시 반복문에서 큐에 데이터가 있는지 체크하는 식으로 구현했다
    • 언젠가는 생산자 스레드가 실행되어서 큐에 데이터를 추가할 것이라는 가정이 존재한다.

생산자 소비자 문제 - 예제2 분석

문제는 while문에 있다.

생산자 먼저 실행 분석

  • p3가 락을 획득한 후, 큐가 가득차 있으니 sleep(1000)을 사용해서 RUNNABLE → TIMED_WAITING 상태가 된다
  • 이때, 반복문을 사용해서 1초마다 큐에 빈 자리가 있는지 반복해서 확인한다.
  • 핵심은 p3 스레드가 락을 가지고 있는 상태에서,큐에 빈 자리가 나올 때까지 대기한다는 점이다.
  • 이후 소비자 스레드는 당연히 락이 없기에 모두 BLOCKED 상태가 된다.

소비자 먼저 실행 분석

  • 이 경우, c1이 시작부터 데이터가 없으니 락을 가진 상태로 계속 반복문을 돌고 있어서 문제이다.
  • c1이 계속 락을 점유하고 있기 때문에, 이후의 모든 스레드들은 모두 BLOCKED 상태가 된다.

무한 대기 문제(sleep 시 락 점유 문제)

결국 임계 영역 안에서 락을 가지고 대기하는 것이 문제이다.

 

그렇다면 sleep()을 호출해서 잠시 대기할 때 아무일도 하지 않는데, 이때 대기하는 동안 잠시 다른 스레드에게 락을 양보하면 어떨까?

 

그러면 다른 스레드가 버퍼에 값을 채우거나 버퍼의 값을 가져갈 수 있을 것이다. 그러면 락을 가진 스레드도 버퍼에서 값을 획득하거나 값을 채우고 락을 반납할 수 있을 것이다.

→ 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다면, 이 문제를 쉽게 풀 수 있다

→ 자바의 Object.wait(), Object.notify()를 사용하면 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다.


Object - wait, notify

wait(), notify() 설명

  • Object.wait()
    • 현재 스레드가 가진 락을 반납하고 다시 WAITING한다.
    • 현재 스레드를 WAITING 상태로 전환하는 메서드이므로, 이 메서드는 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다.
    • 호출한 스레드는 락을 반납하고, 다른 스레드가 해당 락을 획득할 수 있도록 한다.
    • 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify() 또는 notifyAll()을 호출할 때까지 대기 상태를 유지한다.
  • Object.notify()
    • 대기 중인 스레드 중 하나를 깨운다.
    • 이 메서드는 synchronized 블록이나 메서드에서 호출되어야 한다. 깨운 스레드는 락을 다시 획득할 기회를 얻게 된다.
    • 만약 대기 중인 스레드가 여러 개라면, 그 중 하나만 깨워진다.
  • Object.notifyAll()
    • 대기 중인 모든 스레드를 깨운다.
    • 이 메서드 역시 synchronized 블록이나 메서드에서 호출되어야 한다.
    • 이 방법은 모든 스레드를 깨워야 할 필요가 있는 경우에 유용하다.

생산자 소비자 문제 - 예제 3 코드 (sleep 대신 wait & notify 추가)

BoundedQueueV3

public class BoundedQueueV3 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV3(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득참, 생산자 대기");
            try {
                wait(); // RUNNABLE -> WAITING
                log("[put] 생산자 깨어남!");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data);
        log("[put] 생산자 데이터 저장, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            try {
                wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        String data = queue.poll();
        log("[take] 소비자 데이터 획득, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
        return data;
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  • 앞서 작성한 sleep() 코드 대신 Object.wait()를 사용했다.
  • put(data) - wait(), notify()
    • 락을 획득한 생산자 스레드는 반복문을 사용해서 큐에 빈 공간이 생기는지 주기적으로 체크한다.
    • 만약 빈 공간이 없다면 Object.wait()을 사용해서 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐의 빈 공간을 체크한다.
    • 생산자가 데이터를 큐에 저장하고 나면 notify()를 통해 저장된 데이터가 있다고 대기하는 스레드에게 알려주어야 한다!
  • take() - wait(), notify()
    • 락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다.
    • 만약 데이터가 없다면 Object.wait()을 사용해서 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다.
    • 소비자가 데이터를 획득하고 나면 notify()를 통해 큐에 저장할 여유 공간이 생겼다고 대기하는 스레드에게 알려주어야 한다!

wait()로 대기상태에 빠진 스레드는 notify()를 사용해야 깨울 수 있다. 핵심은 wait()를 호출해서 대기 상태에 빠질 때, 락을 반납하고 대기 상태에 빠진다는 것이다. 대기 상태에 빠지면 어차피 아무일도 하지 않으므로 락도 필요하지 않다.

 

중요한 것은 대기 집합에서 깨어난 스레드도 일단은 임계 영역 안에 있다는 것이다. 임계 영역에 있는 코드를 실행하려면 먼저 락을 획득해야 하는데, 아직 notify()를 호출한 스레드 쪽의 작업이 끝나지 않았으므로 깨어난 스레드는 WAITING → BLOCKED 된다.

이후, 호출한 스레드의 작업이 끝나 락을 반납하면, 깨어난 스레드가 락을 이어 받아서 작업을 수행한다.

정리

소비자 우선 실행 시, 소비자인 c1이 같은 소비자인 c2, c3를 깨울 수 있었다. 이 경우 큐에 데이터가 없을 가능성이 있다. 이때는 깨어난 소비자 스레드가 CPU 자원만 소모하고 다시 대기 집합에 들어가기 떄문에 비효율적이다.

 

만약 소비자인 c1 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 하지만, notify()는 이런 선택을 할 수 없다..

 

물론 비효율적이라는 것이지 결과에는 아무 문제없다.


Object - wait, notify 한계

Object.wait() , Object.notify() 방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify() 를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같 은 생산자를 깨우는 비효율도 발생할 수 있다.

→ 같은 종류의 스레드를 깨울 때 비효율이 발생한다.

스레드 기아(thread starvation)

notify()의 또 다른 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 발생할 수 있는 스레드 기아 문제가 있다. 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 아주 나중에 깨어나거나 실행되지 못하는 상황이 생길 수 있으며, 실행조차 되지 못하는 상황을 스레드 기아(starvation) 상태라 한다.

실제로 자바에서는 이렇게 구현되어있지는 않다. 최대한 스레드 기아 상태가 발생하지 않도록 내부적으로 잘 구현되어 있다. 하지만 이게 해결책이 되지는 못한다는 것을 알아두자.

 

이런 스레드 기아 상태를 해결하기 위해 notify() 대신 notifyAll()을 사용할 수 있다.

notifyAll 사용 시

 

결과적으로 notifyAll()을 사용해서 스레드 기아 문제는 막을 수 있지만.. 비효율을 막지는 못한다.


Lock Condition

위에서 발생한 비효율 문제를 해결하기 위해서는 생산자 스레드가 대기하는 대기 집합과, 소비자 스레드가 대기하는 대기 집합을 둘로 나누면 된다.

 

그리고 생산자 스레드가 데이터를 생산하면 소비자 스레드가 대기하는 대기 집합에만 알려주고, 소비자 스레드가 데이터를 소비하면 생산자 스레드가 대기하는 대기 집합에만 열려주면 되는 것이다.

= 생산자용, 소비자용 대기 집합을 서로 나누어 분리하면 비효율 문제를 깔끔하게 해결할 수 있다

 

이를 위해서 Lock, ReentrantLock을 사용할 수 있다.

Condition

  • 설명
    • Condition은 ReentrantLock을 사용하는 스레드가 대기하는 스레드 대기 공간이다.
    • lock.newCondition()을 통해 스레드 대기 공간을 만들 수 있다.
      • Condition condition = lock.newCondition()
    • Object.wait()에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있다. 반면, Lock(ReentrantLock)을 사용하는 경우, 이렇게 스레드 대기 공간을 직접 만들어서 사용해야 한다.
  • 메서드
    • condition.await(): Object.wait()와 유사한 기능으로, 지정한 condition에 현재 스레드를 락을 반납하고 대기(WAITING) 상태로 보관한다.
    • condition.signal(): Object.notify()와 유사한 기능으로, 지정한 condition에서 대기 중인 스레드를 하나 깨운다. 깨어난 스레드는 condition에서 빠져나온다.

생산자 소비자 대기 공간 분리 - 예제 5 코드(ReentrantLock & Condition 사용)

BoundedQueueV4

public class BoundedQueueV4 implements BoundedQueue {

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV5(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득참, 생산자 대기");
                try {
                    producerCond.await(); // Lock 인터페이스에서 제공하는 wait() 기능
                    log("[put] 생산자 깨어남!");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
            consumerCond.signal(); // Lock 인터페이스에서 제공하는 notify() 기능
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                    consumerCond.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
            producerCond.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

핵심은 생산자는 소비자를 깨우고, 소비자는 생산자를 깨운다는 점이다!

Object.notify() vs Condition.signal()

  • Object.notify()
    • 대기 중인 스레드 중 임의의 하나를 선택해서 깨운다. 스레드가 깨어나는 순서는 정의되어 있지 않으며, JVM 구현에 따라 다르다. 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있다.
    • synchronized 블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 한다.
  • Condition.signal()
    • 대기 중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨운다. 이 부분은 자바 버전과 구현에 따라 달라질 수 있지만, 보통 Condition의 구현은 Queue 구조를 사용하기 때문에 FIFO 순서로 깨운다.
    • ReentrantLock을 가지고 있는 스레드가 호출해야 한다.

스레드의 대기

락 대기 집합

  • 락 대기 집합은 자바 내부에 구현되어 있어서 모니터 락과 같이 개발자가 확인하기는 어렵다
  • BLOCKED 상태의 스레드들이 관리된다.
  • 언젠가 락을 소유 중인 스레드가 락을 반납하면 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득한다.

개념상 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소이다.

비유를 하자면 임계 영역을 안전하게 지키기 위한 2중 감옥인 것이다. 스레드는 2중 감옥을 모두 탈출해야 임계 영역을 수행할 수 있다.

정리

자바의 모든 객체 인스턴스는 멀티스레드와 임계 영역을 다루기 위해 내부에 3가지 기본 요소를 가진다.

  • 모니터 락
  • 락 대기 집합(모니터 락 대기 집합) → 1차 대기소
  • 스레드 대기 집합 → 2차 대기소

2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있다.

RUNNABLE → {BLOCKED(다른 스레드 들은)} → WAITING(wait() 호출), 모니터 락 반납 → 다른 스레드가 notify() 호출 → 스레드 대기 잡합 빠져나오면서 모니터 락 획득 시도(RUNNABLE) → 바로 획득하지 못함(BLOCKED) → 획득(RUNNABLE)

synchronized vs ReentrantLock 대기

synchronized와 마찬가지로 Lock(ReentrantLock)도 2가지 단계의 대기 상태가 존재한다. 둘다 같은 개념을 구현한 것이기 때문에 비슷하다

(좌) synchronized 대기 / (우) Lock(ReentrantLock) 대기

  • synchronized 대기
    • 대기1: 모니터 락 획득 대기
      • 자바 객체 내부의 락 대기 집합(모니터 락 대기 집합)에서 관리
      • BLOCKED 상태로 대기
      • synchronized를 시작할 때 락이 없으면 대기
    • 대기2: wait() 대기
      • wait()를 호출했을 때 자바 객체 내부의 스레드 대기 집합에서 관리
      • WAITING 상태로 대기
      • 다른 스레드가 notify()를 호출했을 때 스레드 대기 집합을 빠져나감
  • Lock(ReentrantLock)
    • 대기1: ReentrantLock 락 획득 대기
      • ReentrantLock의 대기 큐에서 관리
      • WAITING 상태로 대기
      • lock.lock()을 호출했을 때 락이 없으면 대기
    • 대기2: await() 대기
      • condition.await()를 호출했을 때, condition 객체의 스레드 대기 공간에서 관리
      • WAITING 상태로 대기
      • 다른 스레드가 condition.signal()을 호출했을 때 condition 객체의 스레드 대기 공간에서 빠져나감

BlockingQueue

우리가 완성한 BoundedQueueV4는 생산자 소비자 문제, 또는 한정된 버퍼라고 알려진 문제를 매우 효율적으로 해결할 수 있는 자료구조이다.

 

만약, 멀티스레드 상황에서 생산자 소비자 문제가 나타난다면 우리가 만든 BoundedQueueV4를 사용하면 된다. 하지만, 이는 이미 다 만들어져 있다.

BlockingQueue

자바는 생산자 소비자 문제, 또는 한정된 버퍼라고 불리는 문제를 해결하기 위해 java.util.concurrent.**BlockingQueue** 라는 인터페이스와 구현체들을 제공한다.

  • 데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업(put())을 시도하는 스레드는 공간이 생길 때까지 차단된다.
  • 데이터 획득 차단: 큐가 비어 있으면 획득 작업(take())을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.

BlockingQueue 인터페이스와 대표적인 구현체

BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {
        boolean add(E e);
        boolean offer(E e);
        void put(E e) throws InterruptedException;
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
        E take() throws InterruptedException;
        E poll(long timeout, TimeUnit unit) throws InterruptedException;
        boolean remove(Object o);
        //...
}
  • 대표적인 구현체
    • ArrayBlockingQueue: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
    • LinkedBlockingQueue: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 또는 무한하게 사용할 수도 있다.

BlockingQueue 사용 예제6

public class BoundedQueueV6_1 implements BoundedQueue {

    private BlockingQueue<String> queue;

    public BoundedQueueV6_1(int max) {
        this.queue = new ArrayBlockingQueue<>(max);
    }

    @Override
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

BlockingQueue - 기능 설명

  • 즉시 반환
    • BlockingQueue의 offer(data), poll()를 사용해서 스레드를 대기하지 않고 즉시 반환할 수 있다.
      • 두 메서드는 스레드가 대기하지 않는다
    • offer(data)는 성공하면 true를 반환하고, 버퍼가 가득 차면 즉시 false를 반환한다.
    • poll()는 버퍼에 데이터가 없으면 즉시 null을 반환한다.
  • 시간 대기
    • BlockingQueue 의 offer(data, 시간), poll(시간) 를 사용해서, 특정 시간 만큼만 대기하도록 할 수 있다.
    • offer(data, 시간)는 성공하면 true를 반환하고, 버퍼가 가득 차서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면 false를 반환한다.
      • 여기서는 확인을 목적으로 1 나노초(NANOSECONDS)로 설정했다.
    • poll(시간) 버퍼에 데이터가 없어서 스레드가 대기해야 하는 상황이면, 지정한 시간까지 대기한다. 대기 시간을 지나면 null을 반환한다.
      • 여기서는 2초(SECONDS)로 설정했다.
  • 예외
    • BlockingQueue 의 add(data), remove() 를 사용해서, 대기시 예외가 발생하도록 할 수 있다.
    • add(data)는 성공하면true를 반환하고, 버퍼가 가득 차면 즉시 예외가 발생한다.
      • java.lang.IllegalStateException: Queue full
    • remove() 는 버퍼에 데이터가 없으면, 즉시 예외가 발생한다
      • java.util.NoSuchElementException
728x90
반응형

'Java' 카테고리의 다른 글

[Java] 동시성 컬렉션  (0) 2025.01.06
[Java] CAS - 동기화와 원자적 연산  (1) 2025.01.06
[Java] 스레드 동시성 문제  (0) 2025.01.06
[Java] 스레드의 생성 / 실행 / 제어 / 생명 주기  (0) 2025.01.06
[Java] Optional  (0) 2025.01.05
'Java' 카테고리의 다른 글
  • [Java] 동시성 컬렉션
  • [Java] CAS - 동기화와 원자적 연산
  • [Java] 스레드 동시성 문제
  • [Java] 스레드의 생성 / 실행 / 제어 / 생명 주기
mxruhxn
mxruhxn
소소하게 개발 공부 기록하기
    반응형
    250x250
  • mxruhxn
    maruhxn
    mxruhxn
  • 전체
    오늘
    어제
    • 분류 전체보기 (150)
      • Java (21)
      • Spring (4)
      • Database (13)
      • Operating Syste.. (1)
      • Computer Archit.. (0)
      • Network (24)
      • Data Structure (6)
      • Algorithm (11)
      • Data Infra (7)
      • DevOps (12)
      • ETC (27)
      • Project (21)
      • Book (1)
      • Look Back (1)
  • 블로그 메뉴

    • 링크

      • Github
    • 공지사항

    • 인기 글

    • 태그

    • 최근 댓글

    • 최근 글

    • hELLO· Designed By정상우.v4.10.0
    mxruhxn
    [Java] 스레드 - 생산자 소비자 문제
    상단으로

    티스토리툴바