← Назад к вопросам

Producer-Consumer с BlockingQueue

2.0 Middle🔥 171 комментариев
#Коллекции#Многопоточность

Условие

Реализуйте паттерн Producer-Consumer с использованием BlockingQueue.

Требования

  • Один или несколько Producer добавляют данные в очередь
  • Один или несколько Consumer извлекают данные
  • Используйте ArrayBlockingQueue с ограниченным размером
  • Реализуйте graceful shutdown
  • Producer должен генерировать числа от 1 до 100
  • Consumer должен выводить полученные числа

Подсказка

Используйте ExecutorService для управления потоками и "отравленную пилюлю" (poison pill) для завершения.

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Producer-Consumer паттерн с BlockingQueue

Это один из самых важных паттернов для многопоточного программирования. BlockingQueue автоматически управляет синхронизацией между производителями и потребителями.

Основное решение

import java.util.concurrent.*;

public class ProducerConsumerExample {
    private static final int QUEUE_SIZE = 10;
    private static final int ITEMS_TO_PRODUCE = 100;
    private static final Integer POISON_PILL = -1; // Сигнал завершения

    public static void main(String[] args) throws InterruptedException {
        // Ограниченная очередь размером 10
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

        // ExecutorService для управления потоками
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        try {
            // Запуск 1 Producer
            executorService.submit(new Producer(queue, ITEMS_TO_PRODUCE));

            // Запуск 2 Consumers
            executorService.submit(new Consumer(queue, "Consumer-1"));
            executorService.submit(new Consumer(queue, "Consumer-2"));

            // Ждём завершения всех задач
            executorService.shutdown();
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("Потоки не завершились за 60 секунд");
                executorService.shutdownNow();
            }
        } finally {
            if (!executorService.isShutdown()) {
                executorService.shutdownNow();
            }
        }

        System.out.println("Программа завершена успешно");
    }

    // Producer - генерирует данные
    static class Producer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final int itemsToProduceCount;

        public Producer(BlockingQueue<Integer> queue, int itemsToProduceCount) {
            this.queue = queue;
            this.itemsToProduceCount = itemsToProduceCount;
        }

        @Override
        public void run() {
            try {
                // Генерируем числа от 1 до 100
                for (int i = 1; i <= itemsToProduceCount; i++) {
                    System.out.println("[Producer] Генерирует: " + i);
                    
                    // put() блокирует, если очередь полна
                    queue.put(i);
                    
                    // Имитируем работу
                    Thread.sleep(50);
                }

                // Отправляем "отравленную пилюлю" для каждого Consumer
                for (int i = 0; i < 2; i++) {
                    queue.put(POISON_PILL);
                }

                System.out.println("[Producer] Завершён");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("[Producer] Прерван");
            }
        }
    }

    // Consumer - потребляет данные
    static class Consumer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final String name;

        public Consumer(BlockingQueue<Integer> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    // take() блокирует, если очередь пуста
                    Integer item = queue.take();

                    // Проверяем на "отравленную пилюлю"
                    if (item.equals(POISON_PILL)) {
                        // Передаём пилюлю дальше для других потребителей
                        queue.put(POISON_PILL);
                        System.out.println("[" + name + "] Получил сигнал завершения");
                        break;
                    }

                    System.out.println("[" + name + "] Получил: " + item);
                    
                    // Имитируем обработку
                    Thread.sleep(100);
                }
                System.out.println("[" + name + "] Завершён");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("[" + name + "] Прерван");
            }
        }
    }
}

Объяснение ключевых моментов

1. BlockingQueue - синхронизированная очередь

// ArrayBlockingQueue - с фиксированным размером
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

// Основные методы:
queue.put(item);      // Добавить (блокирует если полна)
Integer item = queue.take(); // Получить (блокирует если пуста)

// Альтернативы с timeout:
queue.offer(item, 1, TimeUnit.SECONDS);  // false если не успел
Integer item = queue.poll(1, TimeUnit.SECONDS); // null если timeout

2. Отравленная пилюля (Poison Pill)

Это специальное значение, которое сигнализирует о завершении:

private static final Integer POISON_PILL = -1;

// Producer отправляет пилюлю
queue.put(POISON_PILL);

// Consumer проверяет и завершается
if (item.equals(POISON_PILL)) {
    queue.put(POISON_PILL);  // Передаём для других consumers
    break;
}

3. ExecutorService для управления потоками

// Создаём пул с 3 потоками (1 Producer + 2 Consumer)
ExecutorService executorService = Executors.newFixedThreadPool(3);

// Запускаем задачи
executorService.submit(new Producer(queue, 100));
executorService.submit(new Consumer(queue, "Consumer-1"));
executorService.submit(new Consumer(queue, "Consumer-2"));

// Graceful shutdown
executorService.shutdown();  // Не принимает новые задачи
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
    executorService.shutdownNow(); // Принудительно если не завершились
}

Альтернативное решение без BlockingQueue

Для сравнения - это БЫ сложнее и опаснее:

// ✗ Нужна ручная синхронизация
public class ManualSyncProducerConsumer {
    private static final int QUEUE_SIZE = 10;
    private final Queue<Integer> queue = new LinkedList<>();

    public synchronized void produce(Integer item) throws InterruptedException {
        while (queue.size() >= QUEUE_SIZE) {
            wait(); // Ждём, пока место освободится
        }
        queue.add(item);
        notifyAll(); // Разбуди потребителей
    }

    public synchronized Integer consume() throws InterruptedException {
        while (queue.isEmpty()) {
            wait(); // Ждём, пока появятся данные
        }
        Integer item = queue.remove();
        notifyAll(); // Разбуди производителей
        return item;
    }
}

С несколькими Producer-ами

public class MultiProducerConsumer {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(20);
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        try {
            // 2 Producer-а
            executorService.submit(new Producer(queue, 1, 50));
            executorService.submit(new Producer(queue, 51, 100));

            // 3 Consumer-а
            for (int i = 1; i <= 3; i++) {
                executorService.submit(new Consumer(queue, "Consumer-" + i, 2));
            }

            executorService.shutdown();
            executorService.awaitTermination(120, TimeUnit.SECONDS);
        } finally {
            if (!executorService.isShutdown()) {
                executorService.shutdownNow();
            }
        }
    }

    static class Producer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final int start, end;

        public Producer(BlockingQueue<Integer> queue, int start, int end) {
            this.queue = queue;
            this.start = start;
            this.end = end;
        }

        @Override
        public void run() {
            try {
                for (int i = start; i <= end; i++) {
                    System.out.println("[Producer-" + Thread.currentThread().getName() + "] " + i);
                    queue.put(i);
                    Thread.sleep(30);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Consumer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final String name;
        private final int poisonPillCount;

        public Consumer(BlockingQueue<Integer> queue, String name, int poisonPillCount) {
            this.queue = queue;
            this.name = name;
            this.poisonPillCount = poisonPillCount;
        }

        @Override
        public void run() {
            try {
                int pillsReceived = 0;
                while (pillsReceived < poisonPillCount) {
                    Integer item = queue.take();
                    if (item == -1) {
                        pillsReceived++;
                        queue.put(-1); // Передай дальше
                    } else {
                        System.out.println("[" + name + "] " + item);
                        Thread.sleep(50);
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

С обработкой исключений

static class RobustConsumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final String name;

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Integer item = null;
                try {
                    // Timeout предотвращает бесконечное ожидание
                    item = queue.poll(5, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }

                if (item == null) {
                    System.out.println("[" + name + "] Timeout - выходим");
                    break;
                }

                if (item == -1) {
                    queue.put(-1);
                    break;
                }

                try {
                    processItem(item);
                } catch (Exception e) {
                    System.err.println("[" + name + "] Ошибка обработки " + item + ": " + e);
                }
            }
        } finally {
            System.out.println("[" + name + "] Завершён");
        }
    }

    private void processItem(Integer item) {
        System.out.println("[" + name + "] Обрабатываю: " + item);
    }
}

Тестирование

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class ProducerConsumerTest {
    @Test
    public void testQueueBlocking() throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

        // Добавляем 2 элемента (максимум)
        queue.put(1);
        queue.put(2);

        // 3-й элемент должен блокировать (очередь полна)
        // Проверяем что блокирует
        long start = System.currentTimeMillis();
        Thread t = new Thread(() -> {
            try {
                queue.put(3); // Заблокируется
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        t.start();
        Thread.sleep(100);
        assertTrue(t.isAlive(), "Поток должен быть заблокирован");
        
        // Освобождаем место
        queue.take();
        t.join(1000);
        assertFalse(t.isAlive(), "Поток должен был разблокироваться");
    }
}

Важные замечания

Преимущества BlockingQueue:

  • ✓ Автоматическая синхронизация
  • ✓ Предотвращает переполнение/опустошение
  • ✓ Потокобезопасна
  • ✓ Применяется в пулах потоков (ThreadPoolExecutor)

Когда использовать:

  • ✓ Producer-Consumer паттерн
  • ✓ Многопоточные приложения
  • ✓ Обработка асинхронных задач
  • ✓ Системы с different throughput

Альтернативы:

  • LinkedBlockingQueue — неограниченная (опасно!)
  • PriorityBlockingQueue — с приоритетами
  • DelayQueue — отложенное извлечение
  • SynchronousQueue — прямая передача (без хранения)

Заключение

Producer-Consumer с BlockingQueue — это надёжный и эффективный способ синхронизации между потоками. Помни:

  1. BlockingQueue управляет синхронизацией
  2. Poison pill сигнализирует о завершении
  3. ExecutorService управляет жизненным циклом потоков
  4. Graceful shutdown предотвращает потерю данных
  5. Никогда не используй Thread.stop() — используй interrupt()