Пул потоков (Thread Pool)
Условие
Реализуйте пул потоков (Thread Pool) на C++11 или выше.
Пул потоков должен:
- Создавать фиксированное количество рабочих потоков при инициализации
- Принимать задачи через метод submit() и возвращать std::future для получения результата
- Выполнять задачи в порядке очереди (FIFO)
- Корректно завершать работу, дождавшись выполнения всех задач в очереди
Требования
- Используйте std::thread, std::mutex, std::condition_variable
- Задачи должны быть произвольными callable объектами
- Метод submit() должен быть шаблонным и поддерживать любой тип возвращаемого значения
Пример использования
ThreadPool pool(4); // 4 рабочих потока
auto future1 = pool.submit([]() {
return 42;
});
auto future2 = pool.submit([](int a, int b) {
return a + b;
}, 10, 20);
std::cout << future1.get() << std::endl; // 42
std::cout << future2.get() << std::endl; // 30
Подсказка
Используйте std::packaged_task для обёртки задач и получения future.
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение: Пул потоков (Thread Pool)
Архитектура
Пул потоков состоит из:
- Рабочие потоки — ждут задач в очереди
- Очередь задач — thread-safe queue с задачами (std::packaged_task)
- Condition Variable — сигнализирует, что задача поступила
Ключевая идея: рабочие потоки циклически ждут задач, выполняют их и кладут результат в future.
Полная реализация
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <future>
#include <vector>
#include <memory>
class ThreadPool {
private:
struct Task {
std::function<void()> func;
};
std::vector<std::thread> workers;
std::queue<Task> taskQueue;
std::mutex queueMutex;
std::condition_variable condition;
bool shutDown = false;
void workerLoop() {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(queueMutex);
// Ждём, пока есть задача или начат shutdown
condition.wait(lock, [this] {
return !taskQueue.empty() || shutDown;
});
// Если queue пуста и shutdown — выходим
if (taskQueue.empty() && shutDown) {
break;
}
// Если queue пуста, но shutdown ещё нет — снова ждём
if (taskQueue.empty()) {
continue;
}
task = std::move(taskQueue.front());
taskQueue.pop();
}
// Выполняем задачу БЕЗ блокировки мьютекса
task.func();
}
}
public:
explicit ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back(&ThreadPool::workerLoop, this);
}
}
~ThreadPool() {
shutdown();
}
// Главный метод: submit задачу и получить future с результатом
template<typename Func, typename... Args>
auto submit(Func&& func, Args&&... args) {
using ReturnType = std::invoke_result_t<Func, Args...>;
// Создаём packaged_task с заданной функцией и аргументами
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
[f = std::forward<Func>(func),
... args = std::forward<Args>(args)]() mutable {
return std::invoke(f, args...);
}
);
// Получаем future для возврата
std::future<ReturnType> result = task->get_future();
{
std::lock_guard<std::mutex> lock(queueMutex);
if (shutDown) {
throw std::runtime_error("ThreadPool is shut down");
}
// Кладём task в очередь
taskQueue.push({
[t = task]() { (*t)(); }
});
}
// Пробуждаем рабочий поток
condition.notify_one();
return result;
}
void shutdown() {
{
std::lock_guard<std::mutex> lock(queueMutex);
if (shutDown) return;
shutDown = true;
}
// Пробуждаем все потоки (иначе они заблокированы на wait)
condition.notify_all();
// Ждём завершения всех потоков
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
size_t getQueueSize() const {
std::lock_guard<std::mutex> lock(queueMutex);
return taskQueue.size();
}
};
Использование и примеры
int main() {
ThreadPool pool(4); // 4 рабочих потока
// Пример 1: функция без параметров
auto future1 = pool.submit([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 42;
});
std::cout << "Result 1: " << future1.get() << std::endl; // 42
// Пример 2: функция с параметрами
auto future2 = pool.submit([](int a, int b) {
return a + b;
}, 10, 20);
std::cout << "Result 2: " << future2.get() << std::endl; // 30
// Пример 3: функция без возврата
auto future3 = pool.submit([]() {
std::cout << "Background task\n";
});
future3.get(); // ждём завершения
// Пример 4: несколько задач
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; ++i) {
futures.push_back(pool.submit([](int x) {
return x * x;
}, i));
}
// Собираем результаты
for (int i = 0; i < futures.size(); ++i) {
std::cout << i << "^2 = " << futures[i].get() << "\n";
}
// pool автоматически выключится при разрушении (~ThreadPool)
return 0;
}
Ключевые детали реализации
1. std::packaged_task для обёртки результата:
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
[...]() { return result; }
);
std::future<ReturnType> future = task->get_future();
2. std::invoke для универсальной работы с функциями:
template<typename Func, typename... Args>
auto submit(Func&& func, Args&&... args) {
using ReturnType = std::invoke_result_t<Func, Args...>;
// invoke_result_t вычисляет тип возврата
}
3. Condition variable для эффективного ожидания:
condition.wait(lock, [this] {
return !taskQueue.empty() || shutDown;
});
Поток не занимает CPU, ждёт сигнала.
4. Graceful shutdown:
void shutdown() {
shutDown = true; // больше не принимаем задачи
condition.notify_all(); // пробуждаем всех
// join() ждёт завершения очереди
}
Анализ сложности
- submit(): O(1) — добавление в queue
- get() на future: O(1) + время выполнения задачи
- Память: O(N) где N = количество оставшихся задач в очереди
Потокобезопасность
✅ Mutex защищает:
- Доступ к taskQueue
- Флаг shutDown
- Предотвращает race conditions
✅ Condition variable гарантирует:
- Нет busy waiting
- Потоки просыпаются только при нужде
- Нет deadlock'ов благодаря правильной блокировке
Улучшения для production
1. Обработка исключений в задачах:
try {
task.func();
} catch (const std::exception& e) {
std::cerr << "Task exception: " << e.what() << std::endl;
}
2. Метрики пула:
size_t getQueueSize() const;
size_t getWorkerCount() const;
size_t getTasksCompleted() const; // atomically incremented
3. Priority queue вместо обычной queue:
std::priority_queue<Task, std::vector<Task>,
std::greater<Task>> taskQueue;
4. Динамическое масштабирование потоков:
void addWorker(); // добавить рабочий поток
void removeWorker(); // удалить рабочий поток
Сравнение с std::async
| Подход | Плюсы | Минусы |
|---|---|---|
| ThreadPool | контроль над потоками, повторное использование | больше кода |
| std::async | просто, встроено в STL | может создавать новые потоки, непредсказуемо |
Для production используй ThreadPool для контроля ресурсов.