← Назад к вопросам

Worker Pool

2.0 Middle🔥 181 комментариев
#Конкурентность и горутины#Основы Go#Производительность и оптимизация

Условие

Реализуйте паттерн Worker Pool для параллельной обработки задач. Создайте пул воркеров фиксированного размера, которые будут читать задачи из канала и записывать результаты в другой канал.

Сигнатура

func startWorkerPool(numWorkers int, jobs <-chan int, results chan<- int)

Требования

  • Создать указанное количество воркеров (горутин)
  • Каждый воркер читает задачу из канала jobs
  • Обрабатывает задачу (например, возводит число в квадрат)
  • Записывает результат в канал results
  • Воркеры должны корректно завершаться при закрытии канала jobs

Пример

jobs := make(chan int, 100)
results := make(chan int, 100)
startWorkerPool(3, jobs, results)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение

Worker Pool — это паттерн параллельной обработки, где фиксированное число горутин-воркеров обрабатывают задачи из общей очереди. Это позволяет контролировать использование ресурсов и избежать создания неограниченного количества горутин.

Подход

  1. Запускаем numWorkers горутин (воркеров)
  2. Каждый воркер в цикле читает задачу из канала jobs
  3. Обрабатывает задачу (в примере — возводит число в квадрат)
  4. Отправляет результат в канал results
  5. При закрытии jobs воркер корректно завершается

Реализация

import "sync"

func startWorkerPool(numWorkers int, jobs <-chan int, results chan<- int) {
    var wg sync.WaitGroup
    
    // Запускаем воркеров
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            // Читаем задачи из канала jobs
            for job := range jobs {
                // Обрабатываем задачу (например, возводим в квадрат)
                result := job * job
                results <- result
            }
        }(i)
    }
    
    // Закрываем канал results, когда все воркеры завершены
    go func() {
        wg.Wait()
        close(results)
    }()
}

Пример полного использования

func main() {
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // Запускаем пул воркеров
    startWorkerPool(numWorkers, jobs, results)
    
    // Отправляем задачи
    go func() {
        for i := 1; i <= numJobs; i++ {
            jobs <- i
        }
        close(jobs)
    }()
    
    // Собираем результаты
    for result := range results {
        println(result)  // выведет: 1, 4, 9, 16, 25, ..., 100
    }
}

Как это работает (пошагово)

1. Запускаются 3 воркера
2. Отправляем числа 1-10 в jobs
3. Воркеры параллельно обрабатывают:
   - Воркер 1: читает 1, вычисляет 1² = 1
   - Воркер 2: читает 2, вычисляет 2² = 4
   - Воркер 3: читает 3, вычисляет 3² = 9
   - Воркер 1: читает 4, вычисляет 4² = 16
   - ...
4. Результаты немедленно отправляются в results
5. Все воркеры завершаются, когда jobs закрыт
6. Канал results закрывается автоматически

Варианты оптимизации

1. С более сложной обработкой

func startWorkerPool(numWorkers int, jobs <-chan Task, results chan<- Result) {
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                result := processTask(job)  // более сложная обработка
                results <- result
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
}

2. С обработкой ошибок

type Task struct {
    ID    int
    Value int
}

type Result struct {
    TaskID int
    Value  int
    Error  error
}

func startWorkerPool(numWorkers int, jobs <-chan Task, results chan<- Result) {
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                result := Result{TaskID: job.ID}
                if job.Value < 0 {
                    result.Error = errors.New("negative value")
                } else {
                    result.Value = job.Value * job.Value
                }
                results <- result
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
}

Преимущества Worker Pool

  • Контроль ресурсов: фиксированное число горутин, не растёт с количеством задач
  • Масштабируемость: можно обрабатывать миллионы задач с малым пулом
  • Предсказуемость: нет неконтролируемого создания горутин
  • Простота: стандартный паттерн, легко тестировать

Этот паттерн используется в production системах для обработки HTTP запросов, обработки сообщений из очередей, параллельной обработки данных.