Потокобезопасная очередь
Условие
Реализуйте потокобезопасную очередь (thread-safe queue) для использования в producer-consumer паттерне.
Требования
Класс должен предоставлять следующие методы:
void push(T value)— добавляет элемент в очередьbool try_pop(T& value)— пытается извлечь элемент, возвращает false если очередь пустаvoid wait_and_pop(T& value)— блокируется до появления элемента в очередиbool empty() const— проверка на пустотуsize_t size() const— текущий размер очереди
Дополнительные требования
- Используйте std::mutex и std::condition_variable
- Очередь должна корректно работать с несколькими producer и consumer потоками
- Избегайте spurious wakeup при ожидании
Пример использования
ThreadSafeQueue<int> queue;
// Producer thread
std::thread producer([&queue]() {
for (int i = 0; i < 100; ++i) {
queue.push(i);
}
});
// Consumer thread
std::thread consumer([&queue]() {
int value;
while (queue.wait_and_pop(value)) {
process(value);
}
});
Бонус
Добавьте метод shutdown() для graceful завершения, чтобы wait_and_pop возвращал false после вызова shutdown.
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение: Потокобезопасная очередь
Архитектура
Потокобезопасная очередь (thread-safe queue) комбинирует:
- std::queue — базовая очередь с FIFO
- std::mutex — защита критических секций
- std::condition_variable — сигнализирует потребителям о наличии данных
Ключевая идея: потребители блокируются на condition_variable и просыпаются, когда producer добавляет элемент.
Полная реализация
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
template<typename T>
class ThreadSafeQueue {
private:
mutable std::mutex mtx;
std::queue<T> dataQueue;
std::condition_variable condition;
bool isShutdown = false;
public:
ThreadSafeQueue() = default;
// Запретим копирование
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// Добавить элемент в очередь
void push(T value) {
{
std::lock_guard<std::mutex> lock(mtx);
if (isShutdown) {
throw std::runtime_error("Queue is shut down");
}
dataQueue.push(std::move(value));
}
// Пробуждаем ждущий поток за пределами lock
condition.notify_one();
}
// Попытаться извлечь элемент (неблокирующая версия)
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (dataQueue.empty()) {
return false;
}
value = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// Заблокироваться и ждать элемент
// Возвращает false если произошёл shutdown
bool wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx);
// Ждём, пока очередь не пуста ИЛИ произойдёт shutdown
condition.wait(lock, [this] {
return !dataQueue.empty() || isShutdown;
});
// Если очередь пуста и произошёл shutdown — возвращаем false
if (dataQueue.empty()) {
return false;
}
value = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// Проверка на пустоту
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return dataQueue.empty();
}
// Получить размер очереди
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return dataQueue.size();
}
// Graceful shutdown
void shutdown() {
{
std::lock_guard<std::mutex> lock(mtx);
isShutdown = true;
}
// Пробуждаем все ждущие потоки
condition.notify_all();
}
};
Использование и примеры
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
int main() {
// Пример 1: простой producer-consumer
{
ThreadSafeQueue<int> queue;
// Producer: добавляет числа
std::thread producer([&queue]() {
for (int i = 0; i < 5; ++i) {
std::cout << "Producing: " << i << std::endl;
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// Consumer: потребляет числа
std::thread consumer([&queue]() {
int value;
while (queue.wait_and_pop(value)) {
std::cout << "Consumed: " << value << std::endl;
}
});
producer.join();
queue.shutdown(); // сигнализируем consumer завершиться
consumer.join();
}
// Вывод:
// Producing: 0
// Consumed: 0
// Producing: 1
// Consumed: 1
// ...
// Пример 2: несколько producers и consumers
{
ThreadSafeQueue<std::string> queue;
// 2 producer потока
std::vector<std::thread> producers;
for (int p = 0; p < 2; ++p) {
producers.emplace_back([&queue, p]() {
for (int i = 0; i < 3; ++i) {
std::string msg = "P" + std::to_string(p) + "-" + std::to_string(i);
queue.push(msg);
}
});
}
// 3 consumer потока
std::vector<std::thread> consumers;
for (int c = 0; c < 3; ++c) {
consumers.emplace_back([&queue, c]() {
std::string value;
while (queue.wait_and_pop(value)) {
std::cout << "Consumer " << c << " got: " << value << std::endl;
}
});
}
// Ждём producers
for (auto& t : producers) t.join();
// Сигнализируем consumers завершиться
queue.shutdown();
// Ждём consumers
for (auto& t : consumers) t.join();
}
// Пример 3: try_pop (неблокирующая версия)
{
ThreadSafeQueue<int> queue;
queue.push(1);
queue.push(2);
int value;
if (queue.try_pop(value)) {
std::cout << "Got: " << value << std::endl; // Got: 1
}
if (queue.try_pop(value)) {
std::cout << "Got: " << value << std::endl; // Got: 2
}
if (!queue.try_pop(value)) {
std::cout << "Queue is empty" << std::endl; // это выведется
}
}
return 0;
}
Трассировка синхронизации
Сценарий: Producer добавляет, Consumer ждёт
Время | Consumer | Producer
-----|-----------------------------|--------------------------
1 | wait_and_pop() вызвана |
2 | lock(mtx) |
3 | condition.wait() → очередь пуста, ждём |
| (отпустил mtx) |
4 | | push(42) вызвана
5 | | lock(mtx)
6 | | dataQueue.push(42)
7 | | unlock(mtx)
8 | | notify_one() → пробуждает consumer
9 | consumer просыпается |
10 | lock(mtx) — получил lock |
11 | очередь не пуста → выход из wait |
12 | value = 42; pop() |
13 | unlock(mtx) → возвращает true
Ключевые детали
1. Spurious wakeup protection:
condition.wait(lock, [this] {
return !dataQueue.empty() || isShutdown;
});
Лямбда проверяется после каждого пробуждения, даже ложного.
2. notify_one vs notify_all:
push():
condition.notify_one(); // одному consumer
shutdown():
condition.notify_all(); // всем consumers
3. Move семантика для эффективности:
dataQueue.push(std::move(value));
value = std::move(dataQueue.front());
Избегаем лишнего копирования.
4. unique_lock вместо lock_guard:
std::unique_lock<std::mutex> lock(mtx);
condition.wait(lock, ...);
// condition.wait сам управляет блокировкой:
// перед wait → unlock
// после wake → lock
Анализ сложности
- push(): O(1) в среднем + время нотификации O(1)
- try_pop(): O(1) в среднем
- wait_and_pop(): O(1) + время ожидания (зависит от других потоков)
- Память: O(N) где N — размер очереди
Потокобезопасность
✅ Гарантии:
- Race-condition free благодаря mutex
- Deadlock free (всегда освобождаем lock)
- Spurious wakeup защита (lambda guard)
- Нет double-delete (FIFO порядок)
Graceful Shutdown
shutdown():
1. Устанавливаем isShutdown = true
2. Пробуждаем все wait_and_pop() вызовы
3. Оставшиеся элементы можно забрать через try_pop()
4. wait_and_pop() вернёт false, signaling конец работы
Сравнение с альтернативами
| Подход | Преимущества | Недостатки |
|---|---|---|
| std::queue + mutex | простая, надёжная | нет встроенных уведомлений |
| boost::queue | оптимизирована | требует Boost |
| std::deque | двусторонний доступ | не FIFO семантика |
| Наша реализация | полный контроль | нужно писать self |
Потенциальные расширения
1. Максимальный размер:
void push(T value, size_t maxSize) {
std::unique_lock<std::mutex> lock(mtx);
while (dataQueue.size() >= maxSize) {
notFullCondition.wait(lock);
}
dataQueue.push(std::move(value));
condition.notify_one();
}
2. Таймауты:
bool wait_and_pop(T& value, int timeoutMs) {
std::unique_lock<std::mutex> lock(mtx);
return condition.wait_for(lock,
std::chrono::milliseconds(timeoutMs),
[this] { return !dataQueue.empty(); });
}
3. Clear метод:
void clear() {
std::lock_guard<std::mutex> lock(mtx);
while (!dataQueue.empty()) dataQueue.pop();
}