← Назад к вопросам
Producer-Consumer с BlockingQueue
2.0 Middle🔥 171 комментариев
#Коллекции#Многопоточность
Условие
Реализуйте паттерн Producer-Consumer с использованием BlockingQueue.
Требования
- Один или несколько Producer добавляют данные в очередь
- Один или несколько Consumer извлекают данные
- Используйте ArrayBlockingQueue с ограниченным размером
- Реализуйте graceful shutdown
- Producer должен генерировать числа от 1 до 100
- Consumer должен выводить полученные числа
Подсказка
Используйте ExecutorService для управления потоками и "отравленную пилюлю" (poison pill) для завершения.
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Producer-Consumer паттерн с BlockingQueue
Это один из самых важных паттернов для многопоточного программирования. BlockingQueue автоматически управляет синхронизацией между производителями и потребителями.
Основное решение
import java.util.concurrent.*;
public class ProducerConsumerExample {
private static final int QUEUE_SIZE = 10;
private static final int ITEMS_TO_PRODUCE = 100;
private static final Integer POISON_PILL = -1; // Сигнал завершения
public static void main(String[] args) throws InterruptedException {
// Ограниченная очередь размером 10
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
// ExecutorService для управления потоками
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
// Запуск 1 Producer
executorService.submit(new Producer(queue, ITEMS_TO_PRODUCE));
// Запуск 2 Consumers
executorService.submit(new Consumer(queue, "Consumer-1"));
executorService.submit(new Consumer(queue, "Consumer-2"));
// Ждём завершения всех задач
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Потоки не завершились за 60 секунд");
executorService.shutdownNow();
}
} finally {
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
}
System.out.println("Программа завершена успешно");
}
// Producer - генерирует данные
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int itemsToProduceCount;
public Producer(BlockingQueue<Integer> queue, int itemsToProduceCount) {
this.queue = queue;
this.itemsToProduceCount = itemsToProduceCount;
}
@Override
public void run() {
try {
// Генерируем числа от 1 до 100
for (int i = 1; i <= itemsToProduceCount; i++) {
System.out.println("[Producer] Генерирует: " + i);
// put() блокирует, если очередь полна
queue.put(i);
// Имитируем работу
Thread.sleep(50);
}
// Отправляем "отравленную пилюлю" для каждого Consumer
for (int i = 0; i < 2; i++) {
queue.put(POISON_PILL);
}
System.out.println("[Producer] Завершён");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("[Producer] Прерван");
}
}
}
// Consumer - потребляет данные
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
public Consumer(BlockingQueue<Integer> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
try {
while (true) {
// take() блокирует, если очередь пуста
Integer item = queue.take();
// Проверяем на "отравленную пилюлю"
if (item.equals(POISON_PILL)) {
// Передаём пилюлю дальше для других потребителей
queue.put(POISON_PILL);
System.out.println("[" + name + "] Получил сигнал завершения");
break;
}
System.out.println("[" + name + "] Получил: " + item);
// Имитируем обработку
Thread.sleep(100);
}
System.out.println("[" + name + "] Завершён");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("[" + name + "] Прерван");
}
}
}
}
Объяснение ключевых моментов
1. BlockingQueue - синхронизированная очередь
// ArrayBlockingQueue - с фиксированным размером
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Основные методы:
queue.put(item); // Добавить (блокирует если полна)
Integer item = queue.take(); // Получить (блокирует если пуста)
// Альтернативы с timeout:
queue.offer(item, 1, TimeUnit.SECONDS); // false если не успел
Integer item = queue.poll(1, TimeUnit.SECONDS); // null если timeout
2. Отравленная пилюля (Poison Pill)
Это специальное значение, которое сигнализирует о завершении:
private static final Integer POISON_PILL = -1;
// Producer отправляет пилюлю
queue.put(POISON_PILL);
// Consumer проверяет и завершается
if (item.equals(POISON_PILL)) {
queue.put(POISON_PILL); // Передаём для других consumers
break;
}
3. ExecutorService для управления потоками
// Создаём пул с 3 потоками (1 Producer + 2 Consumer)
ExecutorService executorService = Executors.newFixedThreadPool(3);
// Запускаем задачи
executorService.submit(new Producer(queue, 100));
executorService.submit(new Consumer(queue, "Consumer-1"));
executorService.submit(new Consumer(queue, "Consumer-2"));
// Graceful shutdown
executorService.shutdown(); // Не принимает новые задачи
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // Принудительно если не завершились
}
Альтернативное решение без BlockingQueue
Для сравнения - это БЫ сложнее и опаснее:
// ✗ Нужна ручная синхронизация
public class ManualSyncProducerConsumer {
private static final int QUEUE_SIZE = 10;
private final Queue<Integer> queue = new LinkedList<>();
public synchronized void produce(Integer item) throws InterruptedException {
while (queue.size() >= QUEUE_SIZE) {
wait(); // Ждём, пока место освободится
}
queue.add(item);
notifyAll(); // Разбуди потребителей
}
public synchronized Integer consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // Ждём, пока появятся данные
}
Integer item = queue.remove();
notifyAll(); // Разбуди производителей
return item;
}
}
С несколькими Producer-ами
public class MultiProducerConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(20);
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
// 2 Producer-а
executorService.submit(new Producer(queue, 1, 50));
executorService.submit(new Producer(queue, 51, 100));
// 3 Consumer-а
for (int i = 1; i <= 3; i++) {
executorService.submit(new Consumer(queue, "Consumer-" + i, 2));
}
executorService.shutdown();
executorService.awaitTermination(120, TimeUnit.SECONDS);
} finally {
if (!executorService.isShutdown()) {
executorService.shutdownNow();
}
}
}
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int start, end;
public Producer(BlockingQueue<Integer> queue, int start, int end) {
this.queue = queue;
this.start = start;
this.end = end;
}
@Override
public void run() {
try {
for (int i = start; i <= end; i++) {
System.out.println("[Producer-" + Thread.currentThread().getName() + "] " + i);
queue.put(i);
Thread.sleep(30);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
private final int poisonPillCount;
public Consumer(BlockingQueue<Integer> queue, String name, int poisonPillCount) {
this.queue = queue;
this.name = name;
this.poisonPillCount = poisonPillCount;
}
@Override
public void run() {
try {
int pillsReceived = 0;
while (pillsReceived < poisonPillCount) {
Integer item = queue.take();
if (item == -1) {
pillsReceived++;
queue.put(-1); // Передай дальше
} else {
System.out.println("[" + name + "] " + item);
Thread.sleep(50);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
С обработкой исключений
static class RobustConsumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final String name;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Integer item = null;
try {
// Timeout предотвращает бесконечное ожидание
item = queue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (item == null) {
System.out.println("[" + name + "] Timeout - выходим");
break;
}
if (item == -1) {
queue.put(-1);
break;
}
try {
processItem(item);
} catch (Exception e) {
System.err.println("[" + name + "] Ошибка обработки " + item + ": " + e);
}
}
} finally {
System.out.println("[" + name + "] Завершён");
}
}
private void processItem(Integer item) {
System.out.println("[" + name + "] Обрабатываю: " + item);
}
}
Тестирование
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
public class ProducerConsumerTest {
@Test
public void testQueueBlocking() throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
// Добавляем 2 элемента (максимум)
queue.put(1);
queue.put(2);
// 3-й элемент должен блокировать (очередь полна)
// Проверяем что блокирует
long start = System.currentTimeMillis();
Thread t = new Thread(() -> {
try {
queue.put(3); // Заблокируется
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
t.start();
Thread.sleep(100);
assertTrue(t.isAlive(), "Поток должен быть заблокирован");
// Освобождаем место
queue.take();
t.join(1000);
assertFalse(t.isAlive(), "Поток должен был разблокироваться");
}
}
Важные замечания
Преимущества BlockingQueue:
- ✓ Автоматическая синхронизация
- ✓ Предотвращает переполнение/опустошение
- ✓ Потокобезопасна
- ✓ Применяется в пулах потоков (ThreadPoolExecutor)
Когда использовать:
- ✓ Producer-Consumer паттерн
- ✓ Многопоточные приложения
- ✓ Обработка асинхронных задач
- ✓ Системы с different throughput
Альтернативы:
- LinkedBlockingQueue — неограниченная (опасно!)
- PriorityBlockingQueue — с приоритетами
- DelayQueue — отложенное извлечение
- SynchronousQueue — прямая передача (без хранения)
Заключение
Producer-Consumer с BlockingQueue — это надёжный и эффективный способ синхронизации между потоками. Помни:
- BlockingQueue управляет синхронизацией
- Poison pill сигнализирует о завершении
- ExecutorService управляет жизненным циклом потоков
- Graceful shutdown предотвращает потерю данных
- Никогда не используй Thread.stop() — используй interrupt()