← Назад к вопросам

Потокобезопасная очередь

1.8 Middle🔥 211 комментариев
#Многопоточность и синхронизация#Структуры данных и алгоритмы#Язык C++

Условие

Реализуйте потокобезопасную очередь (thread-safe queue) для использования в producer-consumer паттерне.

Требования

Класс должен предоставлять следующие методы:

  • void push(T value) — добавляет элемент в очередь
  • bool try_pop(T& value) — пытается извлечь элемент, возвращает false если очередь пуста
  • void wait_and_pop(T& value) — блокируется до появления элемента в очереди
  • bool empty() const — проверка на пустоту
  • size_t size() const — текущий размер очереди

Дополнительные требования

  • Используйте std::mutex и std::condition_variable
  • Очередь должна корректно работать с несколькими producer и consumer потоками
  • Избегайте spurious wakeup при ожидании

Пример использования

ThreadSafeQueue<int> queue;

// Producer thread
std::thread producer([&queue]() {
    for (int i = 0; i < 100; ++i) {
        queue.push(i);
    }
});

// Consumer thread
std::thread consumer([&queue]() {
    int value;
    while (queue.wait_and_pop(value)) {
        process(value);
    }
});

Бонус

Добавьте метод shutdown() для graceful завершения, чтобы wait_and_pop возвращал false после вызова shutdown.

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение: Потокобезопасная очередь

Архитектура

Потокобезопасная очередь (thread-safe queue) комбинирует:

  1. std::queue — базовая очередь с FIFO
  2. std::mutex — защита критических секций
  3. std::condition_variable — сигнализирует потребителям о наличии данных

Ключевая идея: потребители блокируются на condition_variable и просыпаются, когда producer добавляет элемент.

Полная реализация

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template<typename T>
class ThreadSafeQueue {
private:
    mutable std::mutex mtx;
    std::queue<T> dataQueue;
    std::condition_variable condition;
    bool isShutdown = false;
    
public:
    ThreadSafeQueue() = default;
    
    // Запретим копирование
    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
    
    // Добавить элемент в очередь
    void push(T value) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            
            if (isShutdown) {
                throw std::runtime_error("Queue is shut down");
            }
            
            dataQueue.push(std::move(value));
        }
        // Пробуждаем ждущий поток за пределами lock
        condition.notify_one();
    }
    
    // Попытаться извлечь элемент (неблокирующая версия)
    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mtx);
        
        if (dataQueue.empty()) {
            return false;
        }
        
        value = std::move(dataQueue.front());
        dataQueue.pop();
        return true;
    }
    
    // Заблокироваться и ждать элемент
    // Возвращает false если произошёл shutdown
    bool wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Ждём, пока очередь не пуста ИЛИ произойдёт shutdown
        condition.wait(lock, [this] {
            return !dataQueue.empty() || isShutdown;
        });
        
        // Если очередь пуста и произошёл shutdown — возвращаем false
        if (dataQueue.empty()) {
            return false;
        }
        
        value = std::move(dataQueue.front());
        dataQueue.pop();
        return true;
    }
    
    // Проверка на пустоту
    bool empty() const {
        std::lock_guard<std::mutex> lock(mtx);
        return dataQueue.empty();
    }
    
    // Получить размер очереди
    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return dataQueue.size();
    }
    
    // Graceful shutdown
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(mtx);
            isShutdown = true;
        }
        // Пробуждаем все ждущие потоки
        condition.notify_all();
    }
};

Использование и примеры

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

int main() {
    // Пример 1: простой producer-consumer
    {
        ThreadSafeQueue<int> queue;
        
        // Producer: добавляет числа
        std::thread producer([&queue]() {
            for (int i = 0; i < 5; ++i) {
                std::cout << "Producing: " << i << std::endl;
                queue.push(i);
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
        });
        
        // Consumer: потребляет числа
        std::thread consumer([&queue]() {
            int value;
            while (queue.wait_and_pop(value)) {
                std::cout << "Consumed: " << value << std::endl;
            }
        });
        
        producer.join();
        queue.shutdown();  // сигнализируем consumer завершиться
        consumer.join();
    }
    // Вывод:
    // Producing: 0
    // Consumed: 0
    // Producing: 1
    // Consumed: 1
    // ...
    
    // Пример 2: несколько producers и consumers
    {
        ThreadSafeQueue<std::string> queue;
        
        // 2 producer потока
        std::vector<std::thread> producers;
        for (int p = 0; p < 2; ++p) {
            producers.emplace_back([&queue, p]() {
                for (int i = 0; i < 3; ++i) {
                    std::string msg = "P" + std::to_string(p) + "-" + std::to_string(i);
                    queue.push(msg);
                }
            });
        }
        
        // 3 consumer потока
        std::vector<std::thread> consumers;
        for (int c = 0; c < 3; ++c) {
            consumers.emplace_back([&queue, c]() {
                std::string value;
                while (queue.wait_and_pop(value)) {
                    std::cout << "Consumer " << c << " got: " << value << std::endl;
                }
            });
        }
        
        // Ждём producers
        for (auto& t : producers) t.join();
        
        // Сигнализируем consumers завершиться
        queue.shutdown();
        
        // Ждём consumers
        for (auto& t : consumers) t.join();
    }
    
    // Пример 3: try_pop (неблокирующая версия)
    {
        ThreadSafeQueue<int> queue;
        queue.push(1);
        queue.push(2);
        
        int value;
        if (queue.try_pop(value)) {
            std::cout << "Got: " << value << std::endl;  // Got: 1
        }
        
        if (queue.try_pop(value)) {
            std::cout << "Got: " << value << std::endl;  // Got: 2
        }
        
        if (!queue.try_pop(value)) {
            std::cout << "Queue is empty" << std::endl;  // это выведется
        }
    }
    
    return 0;
}

Трассировка синхронизации

Сценарий: Producer добавляет, Consumer ждёт

Время | Consumer                    | Producer
-----|-----------------------------|--------------------------
1    | wait_and_pop() вызвана      |
2    | lock(mtx)                   |
3    | condition.wait() → очередь пуста, ждём |
     |   (отпустил mtx)            |
4    |                             | push(42) вызвана
5    |                             | lock(mtx)
6    |                             | dataQueue.push(42)
7    |                             | unlock(mtx)
8    |                             | notify_one() → пробуждает consumer
9    | consumer просыпается       |
10   | lock(mtx) — получил lock   |
11   | очередь не пуста → выход из wait |
12   | value = 42; pop()           |
13   | unlock(mtx) → возвращает true

Ключевые детали

1. Spurious wakeup protection:

condition.wait(lock, [this] {
    return !dataQueue.empty() || isShutdown;
});

Лямбда проверяется после каждого пробуждения, даже ложного.

2. notify_one vs notify_all:

push():
    condition.notify_one();   // одному consumer

shutdown():
    condition.notify_all();   // всем consumers

3. Move семантика для эффективности:

dataQueue.push(std::move(value));
value = std::move(dataQueue.front());

Избегаем лишнего копирования.

4. unique_lock вместо lock_guard:

std::unique_lock<std::mutex> lock(mtx);
condition.wait(lock, ...);
// condition.wait сам управляет блокировкой:
// перед wait → unlock
// после wake → lock

Анализ сложности

  • push(): O(1) в среднем + время нотификации O(1)
  • try_pop(): O(1) в среднем
  • wait_and_pop(): O(1) + время ожидания (зависит от других потоков)
  • Память: O(N) где N — размер очереди

Потокобезопасность

Гарантии:

  • Race-condition free благодаря mutex
  • Deadlock free (всегда освобождаем lock)
  • Spurious wakeup защита (lambda guard)
  • Нет double-delete (FIFO порядок)

Graceful Shutdown

shutdown():
    1. Устанавливаем isShutdown = true
    2. Пробуждаем все wait_and_pop() вызовы
    3. Оставшиеся элементы можно забрать через try_pop()
    4. wait_and_pop() вернёт false, signaling конец работы

Сравнение с альтернативами

ПодходПреимуществаНедостатки
std::queue + mutexпростая, надёжнаянет встроенных уведомлений
boost::queueоптимизированатребует Boost
std::dequeдвусторонний доступне FIFO семантика
Наша реализацияполный контрольнужно писать self

Потенциальные расширения

1. Максимальный размер:

void push(T value, size_t maxSize) {
    std::unique_lock<std::mutex> lock(mtx);
    while (dataQueue.size() >= maxSize) {
        notFullCondition.wait(lock);
    }
    dataQueue.push(std::move(value));
    condition.notify_one();
}

2. Таймауты:

bool wait_and_pop(T& value, int timeoutMs) {
    std::unique_lock<std::mutex> lock(mtx);
    return condition.wait_for(lock, 
        std::chrono::milliseconds(timeoutMs),
        [this] { return !dataQueue.empty(); });
}

3. Clear метод:

void clear() {
    std::lock_guard<std::mutex> lock(mtx);
    while (!dataQueue.empty()) dataQueue.pop();
}
Потокобезопасная очередь | PrepBro