← Назад к вопросам
Как конфигурировать количество процессов Workers в Celery?
2.0 Middle🔥 191 комментариев
#Асинхронность и многопоточность#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Конфигурирование процессов Workers в Celery: Практический подход
Количество Celery workers — это критический параметр, который влияет на пропускную способность и использование ресурсов. Я научился правильно конфигурировать это через практику в production среде.
Основные параметры и способы конфигурирования
1. Прямой параметр в командной строке
# Запуск с 4 процессами (workers)
celery -A myapp worker --concurrency=4
# Запуск с 8 процессами и пулом процессов
celery -A myapp worker --concurrency=8 --pool=processes
# Запуск с автоматическим определением по ядрам CPU
celery -A myapp worker --concurrency
2. Конфигурация через celeryconfig.py
# celeryconfig.py
import os
from kombu import Exchange, Queue
# Основная конфигурация Celery
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# Количество процессов (workers)
worker_concurrency = 8
worker_prefetch_multiplier = 4 # Сколько задач забирает один worker до выполнения
worker_max_tasks_per_child = 1000 # Перезагрузка worker после 1000 задач
# Другие важные параметры
worker_disable_rate_limits = False
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s'
# Таймауты
task_soft_time_limit = 300 # 5 минут
task_time_limit = 600 # 10 минут (hard limit)
# Очередь
default_queue = 'default'
default_exchange = Exchange('celery', type='direct')
default_routing_key = 'celery'
queues = {
'default': {'exchange': 'celery', 'routing_key': 'celery'},
'high_priority': {'exchange': 'celery', 'routing_key': 'high_priority'},
'low_priority': {'exchange': 'celery', 'routing_key': 'low_priority'},
}
# Роутинг разных задач
task_routes = {
'tasks.send_email': {'queue': 'high_priority'},
'tasks.generate_report': {'queue': 'low_priority'},
}
3. Конфигурация через Django settings
# settings.py (Django project)
from kombu import Queue, Exchange
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# Количество workers (concurrency)
CELERY_WORKER_CONCURRENCY = 8
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# Таймауты
CELERY_TASK_SOFT_TIME_LIMIT = 300 # 5 минут
CELERY_TASK_TIME_LIMIT = 600 # 10 минут
# Очереди для разных типов задач
CELERY_TASK_QUEUES = (
Queue('default', Exchange('celery'), routing_key='celery'),
Queue('high_priority', Exchange('celery'), routing_key='high'),
Queue('low_priority', Exchange('celery'), routing_key='low'),
)
# Маршрутизация
CELERY_TASK_ROUTES = {
'myapp.tasks.send_email': {'queue': 'high_priority'},
'myapp.tasks.export_data': {'queue': 'low_priority'},
'myapp.tasks.cache_warm': {'queue': 'low_priority'},
}
Как я выбираю количество workers
Расчёт оптимального количества
# Математическая формула
# workers = (CPU_cores * 2) + 1 # Для I/O-bound задач
# workers = CPU_cores # Для CPU-bound задач
import multiprocessing
cpu_count = multiprocessing.cpu_count()
# Для I/O-bound (сетевые запросы, БД операции)
io_bound_workers = (cpu_count * 2) + 1
print(f"I/O-bound workers: {io_bound_workers}") # Например: 17 (на 8-ядерной машине)
# Для CPU-bound (вычисления)
cpu_bound_workers = cpu_count
print(f"CPU-bound workers: {cpu_bound_workers}") # 8
Практический пример из production
# Скрипт для динамического расчёта
import multiprocessing
import os
class CeleryConfig:
def __init__(self):
self.cpu_cores = multiprocessing.cpu_count()
self.available_memory_gb = self.get_available_memory()
def get_available_memory(self) -> float:
"""Получить доступную память в GB"""
import psutil
return psutil.virtual_memory().available / (1024 ** 3)
def calculate_workers(self, task_type: str = 'io_bound') -> int:
"""
Рассчитать оптимальное количество workers
task_type: 'io_bound', 'cpu_bound', 'mixed'
"""
if task_type == 'io_bound':
# I/O операции (API запросы, БД)
workers = (self.cpu_cores * 2) + 1
elif task_type == 'cpu_bound':
# Вычисления
workers = self.cpu_cores
else: # mixed
# Смешанные задачи
workers = (self.cpu_cores * 1.5) + 1
# Ограничение по памяти (100MB на worker)
max_by_memory = int(self.available_memory_gb * 1024 / 100)
return min(workers, max_by_memory)
config = CeleryConfig()
recommended_workers = config.calculate_workers('io_bound')
print(f"Recommended workers: {recommended_workers}")
Полная конфигурация для production
# celery_config.py
import os
import multiprocessing
from kombu import Exchange, Queue
class ProductionCeleryConfig:
# Broker и backend
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
# Workers
worker_concurrency = os.getenv(
'CELERY_CONCURRENCY',
(multiprocessing.cpu_count() * 2) + 1
)
worker_prefetch_multiplier = 4
worker_max_tasks_per_child = 1000
worker_disable_rate_limits = False
# Таймауты
task_soft_time_limit = 300 # 5 минут (мягкий лимит)
task_time_limit = 600 # 10 минут (жёсткий лимит)
task_acks_late = True # ACK только после выполнения
task_reject_on_worker_lost = True # Отклонить если worker потерялся
# Очереди
default_queue = 'default'
task_queues = (
Queue('default', Exchange('celery', type='direct'), routing_key='default'),
Queue('high_priority', Exchange('celery'), routing_key='high'),
Queue('emails', Exchange('celery'), routing_key='emails'),
Queue('reports', Exchange('celery'), routing_key='reports'),
)
# Маршрутизация
task_routes = {
'myapp.tasks.send_email': {'queue': 'emails'},
'myapp.tasks.export_data': {'queue': 'reports'},
'myapp.tasks.sync_api': {'queue': 'high_priority'},
}
# Логирование
worker_log_format = '[%(asctime)s: %(levelname)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s] %(task_name)s: %(message)s'
config = ProductionCeleryConfig()
Запуск workers в production с нужным concurrency
# Запуск с конкретным числом процессов
celery -A myapp worker --concurrency=16 --pool=prefork --loglevel=info
# С несколькими очередями
celery -A myapp worker \
--concurrency=8 \
-Q default,high_priority,emails \
--time-limit=600 \
--soft-time-limit=300
# С использованием gevent (для очень много I/O операций)
celery -A myapp worker \
--concurrency=1000 \
--pool=gevent \
-Q default,high_priority
Docker Compose пример
version: '3.8'
services:
celery_worker:
build: .
command: celery -A myapp worker --concurrency=8 --loglevel=info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- CELERY_CONCURRENCY=8
depends_on:
- redis
deploy:
resources:
limits:
cpus: '2'
memory: 2G
# Высокоприоритетный worker
celery_worker_high:
build: .
command: celery -A myapp worker --concurrency=4 -Q high_priority --loglevel=info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
Мониторинг workers
from celery import Celery
from celery.app.control import Inspect
app = Celery('myapp')
inspect = Inspect(app=app)
# Активные задачи
active_tasks = inspect.active()
print(f"Active tasks: {active_tasks}")
# Зарегистрированные задачи
registered = inspect.registered()
print(f"Registered tasks: {registered}")
# Статистика workers
stats = inspect.stats()
for worker_name, worker_stats in stats.items():
print(f"Worker: {worker_name}")
print(f" Pool size: {worker_stats['pool']['max-concurrency']}")
Типичные ошибки, которые я исправлял
- Слишком мало workers → перегрузка, медленная обработка
- Слишком много workers → потребление памяти, контекст-switching
- Забыл про prefetch_multiplier → неравномерное распределение
- Не установил time limits → зависшие задачи
- MemoryError от утечек → нужен max_tasks_per_child
Итоговая рекомендация
Для большинства production систем с I/O операциями я выбираю: CPU_cores × 2 + 1 worker. Для систем с CPU-intensive операциями: CPU_cores. И всегда использую несколько очередей с разными workers для разных типов задач.