← Назад к вопросам

Как конфигурировать количество процессов Workers в Celery?

2.0 Middle🔥 191 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Конфигурирование процессов Workers в Celery: Практический подход

Количество Celery workers — это критический параметр, который влияет на пропускную способность и использование ресурсов. Я научился правильно конфигурировать это через практику в production среде.

Основные параметры и способы конфигурирования

1. Прямой параметр в командной строке

# Запуск с 4 процессами (workers)
celery -A myapp worker --concurrency=4

# Запуск с 8 процессами и пулом процессов
celery -A myapp worker --concurrency=8 --pool=processes

# Запуск с автоматическим определением по ядрам CPU
celery -A myapp worker --concurrency

2. Конфигурация через celeryconfig.py

# celeryconfig.py
import os
from kombu import Exchange, Queue

# Основная конфигурация Celery
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'

# Количество процессов (workers)
worker_concurrency = 8
worker_prefetch_multiplier = 4  # Сколько задач забирает один worker до выполнения
worker_max_tasks_per_child = 1000  # Перезагрузка worker после 1000 задач

# Другие важные параметры
worker_disable_rate_limits = False
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s'

# Таймауты
task_soft_time_limit = 300  # 5 минут
task_time_limit = 600  # 10 минут (hard limit)

# Очередь
default_queue = 'default'

default_exchange = Exchange('celery', type='direct')
default_routing_key = 'celery'

queues = {
    'default': {'exchange': 'celery', 'routing_key': 'celery'},
    'high_priority': {'exchange': 'celery', 'routing_key': 'high_priority'},
    'low_priority': {'exchange': 'celery', 'routing_key': 'low_priority'},
}

# Роутинг разных задач
task_routes = {
    'tasks.send_email': {'queue': 'high_priority'},
    'tasks.generate_report': {'queue': 'low_priority'},
}

3. Конфигурация через Django settings

# settings.py (Django project)
from kombu import Queue, Exchange

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# Количество workers (concurrency)
CELERY_WORKER_CONCURRENCY = 8
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000

# Таймауты
CELERY_TASK_SOFT_TIME_LIMIT = 300  # 5 минут
CELERY_TASK_TIME_LIMIT = 600       # 10 минут

# Очереди для разных типов задач
CELERY_TASK_QUEUES = (
    Queue('default', Exchange('celery'), routing_key='celery'),
    Queue('high_priority', Exchange('celery'), routing_key='high'),
    Queue('low_priority', Exchange('celery'), routing_key='low'),
)

# Маршрутизация
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'high_priority'},
    'myapp.tasks.export_data': {'queue': 'low_priority'},
    'myapp.tasks.cache_warm': {'queue': 'low_priority'},
}

Как я выбираю количество workers

Расчёт оптимального количества

# Математическая формула
# workers = (CPU_cores * 2) + 1  # Для I/O-bound задач
# workers = CPU_cores            # Для CPU-bound задач

import multiprocessing

cpu_count = multiprocessing.cpu_count()

# Для I/O-bound (сетевые запросы, БД операции)
io_bound_workers = (cpu_count * 2) + 1
print(f"I/O-bound workers: {io_bound_workers}")  # Например: 17 (на 8-ядерной машине)

# Для CPU-bound (вычисления)
cpu_bound_workers = cpu_count
print(f"CPU-bound workers: {cpu_bound_workers}")  # 8

Практический пример из production

# Скрипт для динамического расчёта
import multiprocessing
import os

class CeleryConfig:
    def __init__(self):
        self.cpu_cores = multiprocessing.cpu_count()
        self.available_memory_gb = self.get_available_memory()
    
    def get_available_memory(self) -> float:
        """Получить доступную память в GB"""
        import psutil
        return psutil.virtual_memory().available / (1024 ** 3)
    
    def calculate_workers(self, task_type: str = 'io_bound') -> int:
        """
        Рассчитать оптимальное количество workers
        
        task_type: 'io_bound', 'cpu_bound', 'mixed'
        """
        if task_type == 'io_bound':
            # I/O операции (API запросы, БД)
            workers = (self.cpu_cores * 2) + 1
        elif task_type == 'cpu_bound':
            # Вычисления
            workers = self.cpu_cores
        else:  # mixed
            # Смешанные задачи
            workers = (self.cpu_cores * 1.5) + 1
        
        # Ограничение по памяти (100MB на worker)
        max_by_memory = int(self.available_memory_gb * 1024 / 100)
        
        return min(workers, max_by_memory)

config = CeleryConfig()
recommended_workers = config.calculate_workers('io_bound')
print(f"Recommended workers: {recommended_workers}")

Полная конфигурация для production

# celery_config.py
import os
import multiprocessing
from kombu import Exchange, Queue

class ProductionCeleryConfig:
    # Broker и backend
    broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
    result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
    
    # Workers
    worker_concurrency = os.getenv(
        'CELERY_CONCURRENCY',
        (multiprocessing.cpu_count() * 2) + 1
    )
    worker_prefetch_multiplier = 4
    worker_max_tasks_per_child = 1000
    worker_disable_rate_limits = False
    
    # Таймауты
    task_soft_time_limit = 300   # 5 минут (мягкий лимит)
    task_time_limit = 600        # 10 минут (жёсткий лимит)
    task_acks_late = True        # ACK только после выполнения
    task_reject_on_worker_lost = True  # Отклонить если worker потерялся
    
    # Очереди
    default_queue = 'default'
    task_queues = (
        Queue('default', Exchange('celery', type='direct'), routing_key='default'),
        Queue('high_priority', Exchange('celery'), routing_key='high'),
        Queue('emails', Exchange('celery'), routing_key='emails'),
        Queue('reports', Exchange('celery'), routing_key='reports'),
    )
    
    # Маршрутизация
    task_routes = {
        'myapp.tasks.send_email': {'queue': 'emails'},
        'myapp.tasks.export_data': {'queue': 'reports'},
        'myapp.tasks.sync_api': {'queue': 'high_priority'},
    }
    
    # Логирование
    worker_log_format = '[%(asctime)s: %(levelname)s] %(message)s'
    worker_task_log_format = '[%(asctime)s: %(levelname)s] %(task_name)s: %(message)s'

config = ProductionCeleryConfig()

Запуск workers в production с нужным concurrency

# Запуск с конкретным числом процессов
celery -A myapp worker --concurrency=16 --pool=prefork --loglevel=info

# С несколькими очередями
celery -A myapp worker \
  --concurrency=8 \
  -Q default,high_priority,emails \
  --time-limit=600 \
  --soft-time-limit=300

# С использованием gevent (для очень много I/O операций)
celery -A myapp worker \
  --concurrency=1000 \
  --pool=gevent \
  -Q default,high_priority

Docker Compose пример

version: '3.8'

services:
  celery_worker:
    build: .
    command: celery -A myapp worker --concurrency=8 --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
      - CELERY_CONCURRENCY=8
    depends_on:
      - redis
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G
  
  # Высокоприоритетный worker
  celery_worker_high:
    build: .
    command: celery -A myapp worker --concurrency=4 -Q high_priority --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis

Мониторинг workers

from celery import Celery
from celery.app.control import Inspect

app = Celery('myapp')
inspect = Inspect(app=app)

# Активные задачи
active_tasks = inspect.active()
print(f"Active tasks: {active_tasks}")

# Зарегистрированные задачи
registered = inspect.registered()
print(f"Registered tasks: {registered}")

# Статистика workers
stats = inspect.stats()
for worker_name, worker_stats in stats.items():
    print(f"Worker: {worker_name}")
    print(f"  Pool size: {worker_stats['pool']['max-concurrency']}")

Типичные ошибки, которые я исправлял

  1. Слишком мало workers → перегрузка, медленная обработка
  2. Слишком много workers → потребление памяти, контекст-switching
  3. Забыл про prefetch_multiplier → неравномерное распределение
  4. Не установил time limits → зависшие задачи
  5. MemoryError от утечек → нужен max_tasks_per_child

Итоговая рекомендация

Для большинства production систем с I/O операциями я выбираю: CPU_cores × 2 + 1 worker. Для систем с CPU-intensive операциями: CPU_cores. И всегда использую несколько очередей с разными workers для разных типов задач.