← Назад к вопросам

Пул потоков (Thread Pool)

3.0 Senior🔥 231 комментариев
#Многопоточность и синхронизация#ООП и проектирование#Язык C++

Условие

Реализуйте пул потоков (Thread Pool) на C++11 или выше.

Пул потоков должен:

  • Создавать фиксированное количество рабочих потоков при инициализации
  • Принимать задачи через метод submit() и возвращать std::future для получения результата
  • Выполнять задачи в порядке очереди (FIFO)
  • Корректно завершать работу, дождавшись выполнения всех задач в очереди

Требования

  • Используйте std::thread, std::mutex, std::condition_variable
  • Задачи должны быть произвольными callable объектами
  • Метод submit() должен быть шаблонным и поддерживать любой тип возвращаемого значения

Пример использования

ThreadPool pool(4); // 4 рабочих потока

auto future1 = pool.submit([]() {
    return 42;
});

auto future2 = pool.submit([](int a, int b) {
    return a + b;
}, 10, 20);

std::cout << future1.get() << std::endl; // 42
std::cout << future2.get() << std::endl; // 30

Подсказка

Используйте std::packaged_task для обёртки задач и получения future.

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение: Пул потоков (Thread Pool)

Архитектура

Пул потоков состоит из:

  1. Рабочие потоки — ждут задач в очереди
  2. Очередь задач — thread-safe queue с задачами (std::packaged_task)
  3. Condition Variable — сигнализирует, что задача поступила

Ключевая идея: рабочие потоки циклически ждут задач, выполняют их и кладут результат в future.

Полная реализация

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <future>
#include <vector>
#include <memory>

class ThreadPool {
private:
    struct Task {
        std::function<void()> func;
    };
    
    std::vector<std::thread> workers;
    std::queue<Task> taskQueue;
    std::mutex queueMutex;
    std::condition_variable condition;
    bool shutDown = false;
    
    void workerLoop() {
        while (true) {
            Task task;
            
            {
                std::unique_lock<std::mutex> lock(queueMutex);
                
                // Ждём, пока есть задача или начат shutdown
                condition.wait(lock, [this] {
                    return !taskQueue.empty() || shutDown;
                });
                
                // Если queue пуста и shutdown — выходим
                if (taskQueue.empty() && shutDown) {
                    break;
                }
                
                // Если queue пуста, но shutdown ещё нет — снова ждём
                if (taskQueue.empty()) {
                    continue;
                }
                
                task = std::move(taskQueue.front());
                taskQueue.pop();
            }
            
            // Выполняем задачу БЕЗ блокировки мьютекса
            task.func();
        }
    }
    
public:
    explicit ThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; ++i) {
            workers.emplace_back(&ThreadPool::workerLoop, this);
        }
    }
    
    ~ThreadPool() {
        shutdown();
    }
    
    // Главный метод: submit задачу и получить future с результатом
    template<typename Func, typename... Args>
    auto submit(Func&& func, Args&&... args) {
        using ReturnType = std::invoke_result_t<Func, Args...>;
        
        // Создаём packaged_task с заданной функцией и аргументами
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            [f = std::forward<Func>(func), 
             ... args = std::forward<Args>(args)]() mutable {
                return std::invoke(f, args...);
            }
        );
        
        // Получаем future для возврата
        std::future<ReturnType> result = task->get_future();
        
        {
            std::lock_guard<std::mutex> lock(queueMutex);
            
            if (shutDown) {
                throw std::runtime_error("ThreadPool is shut down");
            }
            
            // Кладём task в очередь
            taskQueue.push({
                [t = task]() { (*t)(); }
            });
        }
        
        // Пробуждаем рабочий поток
        condition.notify_one();
        
        return result;
    }
    
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(queueMutex);
            if (shutDown) return;
            shutDown = true;
        }
        
        // Пробуждаем все потоки (иначе они заблокированы на wait)
        condition.notify_all();
        
        // Ждём завершения всех потоков
        for (auto& worker : workers) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    
    size_t getQueueSize() const {
        std::lock_guard<std::mutex> lock(queueMutex);
        return taskQueue.size();
    }
};

Использование и примеры

int main() {
    ThreadPool pool(4);  // 4 рабочих потока
    
    // Пример 1: функция без параметров
    auto future1 = pool.submit([]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 42;
    });
    
    std::cout << "Result 1: " << future1.get() << std::endl;  // 42
    
    // Пример 2: функция с параметрами
    auto future2 = pool.submit([](int a, int b) {
        return a + b;
    }, 10, 20);
    
    std::cout << "Result 2: " << future2.get() << std::endl;  // 30
    
    // Пример 3: функция без возврата
    auto future3 = pool.submit([]() {
        std::cout << "Background task\n";
    });
    
    future3.get();  // ждём завершения
    
    // Пример 4: несколько задач
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 10; ++i) {
        futures.push_back(pool.submit([](int x) {
            return x * x;
        }, i));
    }
    
    // Собираем результаты
    for (int i = 0; i < futures.size(); ++i) {
        std::cout << i << "^2 = " << futures[i].get() << "\n";
    }
    
    // pool автоматически выключится при разрушении (~ThreadPool)
    return 0;
}

Ключевые детали реализации

1. std::packaged_task для обёртки результата:

auto task = std::make_shared<std::packaged_task<ReturnType()>>(
    [...]() { return result; }
);
std::future<ReturnType> future = task->get_future();

2. std::invoke для универсальной работы с функциями:

template<typename Func, typename... Args>
auto submit(Func&& func, Args&&... args) {
    using ReturnType = std::invoke_result_t<Func, Args...>;
    // invoke_result_t вычисляет тип возврата
}

3. Condition variable для эффективного ожидания:

condition.wait(lock, [this] {
    return !taskQueue.empty() || shutDown;
});

Поток не занимает CPU, ждёт сигнала.

4. Graceful shutdown:

void shutdown() {
    shutDown = true;  // больше не принимаем задачи
    condition.notify_all();  // пробуждаем всех
    // join() ждёт завершения очереди
}

Анализ сложности

  • submit(): O(1) — добавление в queue
  • get() на future: O(1) + время выполнения задачи
  • Память: O(N) где N = количество оставшихся задач в очереди

Потокобезопасность

Mutex защищает:

  • Доступ к taskQueue
  • Флаг shutDown
  • Предотвращает race conditions

Condition variable гарантирует:

  • Нет busy waiting
  • Потоки просыпаются только при нужде
  • Нет deadlock'ов благодаря правильной блокировке

Улучшения для production

1. Обработка исключений в задачах:

try {
    task.func();
} catch (const std::exception& e) {
    std::cerr << "Task exception: " << e.what() << std::endl;
}

2. Метрики пула:

size_t getQueueSize() const;
size_t getWorkerCount() const;
size_t getTasksCompleted() const;  // atomically incremented

3. Priority queue вместо обычной queue:

std::priority_queue<Task, std::vector<Task>, 
                   std::greater<Task>> taskQueue;

4. Динамическое масштабирование потоков:

void addWorker();  // добавить рабочий поток
void removeWorker();  // удалить рабочий поток

Сравнение с std::async

ПодходПлюсыМинусы
ThreadPoolконтроль над потоками, повторное использованиебольше кода
std::asyncпросто, встроено в STLможет создавать новые потоки, непредсказуемо

Для production используй ThreadPool для контроля ресурсов.