← Назад к вопросам
Зачем нужен broker в Celery?
2.0 Middle🔥 141 комментариев
#Асинхронность и многопоточность#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Broker в Celery: зачем он нужен и как работает
Broker — это сердце Celery. Без него Celery вообще не работает. Это промежуточная система, которая соединяет ваше приложение с рабочими процессами. Давай разберёмся подробно.
Проблема без Broker
Представь архитектуру веб-приложения без очереди задач:
# ❌ БЕЗ Celery (синхронно)
from flask import Flask
app = Flask(__name__)
@app.route('/send-email')
def send_email():
email = request.json['email']
send_email_blocking(email) # БЛОКИРУЕТ на 5 секунд!
return {'status': 'sent'}
# Проблемы:
# 1. Пользователь ждёт 5 сек пока отправляется письмо
# 2. Если сервер упадёт при отправке, письмо потеряется
# 3. Масштабировать невозможно — очередь задач создаётся в памяти
Что такое Broker
Broker — это система доставки сообщений. Приложение отправляет сообщение в Broker, а рабочие процессы (Workers) слушают Broker и обрабатывают задачи.
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Application │ ────→ │ Broker │ ←─── │ Workers │
│ (Flask) │ sends │ (Redis/RabbitMQ) │ (Celery) │
└─────────────┘ task └──────────────┘ pulls └──────────────┘
tasks
Как это работает:
- Приложение создаёт задачу
- Broker сохраняет задачу в очередь (очень быстро)
- Приложение сразу же возвращает ответ пользователю
- Worker подхватывает задачу из Broker'а
- Worker выполняет задачу
- Worker сохраняет результат (если нужен)
Типы Broker'ов
1. Redis — самый популярный
Преимущества:
- Очень быстрый (in-memory)
- Простой в установке
- Хорош для небольших нагрузок
# Celery с Redis
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def send_email(email):
# Отправляем письмо
print(f"Sending email to {email}")
return f"Email sent to {email}"
# Использование
send_email.delay('user@example.com') # Отправляет в Redis
Недостатки:
- Не персистентен (теряет задачи при перезагрузке)
- RDB/AOF нужны для надёжности
2. RabbitMQ — надёжный
Преимущества:
- Гарантирует доставку задач
- Персистентность (задачи записываются на диск)
- Дорогие гарантии AMQP
- Масштабируется на миллионы задач
# Celery с RabbitMQ
from celery import Celery
app = Celery(
'myapp',
broker='amqp://guest:guest@localhost:5672//'
)
@app.task
def send_email(email):
print(f"Sending email to {email}")
send_email.delay('user@example.com') # В RabbitMQ
Недостатки:
- Медленнее Redis
- Сложнее в установке
- Требует больше ресурсов
3. AWS SQS — облачный
app = Celery(
'myapp',
broker='sqs://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@sqs.eu-west-1.amazonaws.com/'
)
Полный пример Celery с Redis
# celery_app.py
from celery import Celery
import time
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1' # для результатов
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
@app.task(bind=True)
def send_email(self, email, subject, body):
"""Отправляет письмо асинхронно"""
try:
# Имитация отправки письма
time.sleep(5) # Тяжелая операция
print(f"Отправлено письмо на {email}")
return f"Email sent to {email}"
except Exception as exc:
# Retry с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=60)
@app.task
def process_video(video_id):
"""Обработать видео (очень тяжелая операция)"""
print(f"Processing video {video_id}")
# Конвертирование, обрезка, и т.д.
return f"Video {video_id} processed"
@app.task
def periodic_cleanup():
"""Периодическая очистка"""
print("Cleaning up...")
if __name__ == '__main__':
app.start()
# flask_app.py
from flask import Flask, jsonify, request
from celery_app import send_email, process_video
app = Flask(__name__)
@app.route('/send-email', methods=['POST'])
def send_email_endpoint():
data = request.json
email = data['email']
subject = data['subject']
body = data['body']
# Отправляем задачу в Celery (в Redis)
# БЕЗ блокирования
task = send_email.delay(email, subject, body)
return jsonify({
'status': 'task_submitted',
'task_id': task.id # Можешь потом проверить статус
})
@app.route('/task-status/<task_id>')
def task_status(task_id):
"""Проверить статус задачи"""
from celery_app import app as celery_app
task = celery_app.AsyncResult(task_id)
if task.state == 'PENDING':
return jsonify({'status': 'pending'})
elif task.state == 'SUCCESS':
return jsonify({'status': 'success', 'result': task.result})
elif task.state == 'FAILURE':
return jsonify({'status': 'failure', 'error': str(task.info)})
else:
return jsonify({'status': task.state})
@app.route('/process-video', methods=['POST'])
def process_video_endpoint():
data = request.json
video_id = data['video_id']
# Отправляем тяжелую задачу
task = process_video.apply_async(
args=[video_id],
countdown=10, # Запустить через 10 секунд
expires=3600 # Задача истекает через 1 час
)
return jsonify({
'status': 'processing',
'task_id': task.id
})
if __name__ == '__main__':
app.run(debug=True)
Запуск Celery Worker'а
# Запустить worker (слушает Redis)
celery -A celery_app worker --loglevel=info
# Несколько workers параллельно
celery -A celery_app worker -c 4 --loglevel=info # 4 параллельных задачи
# Worker для специфических очередей
celery -A celery_app worker -Q email,video --loglevel=info
Преимущества Broker'а
1. Асинхронность
# Без Broker: пользователь ждёт
def endpoint():
send_email_blocking('user@example.com') # 5 сек блокировка
return 'sent'
# С Broker: пользователь получает ответ мгновенно
def endpoint():
send_email.delay('user@example.com') # Отправляется в Broker
return 'task submitted' # Мгновенно
2. Надёжность (с правильным Broker'ом)
# RabbitMQ гарантирует доставку
@app.task(bind=True, max_retries=3)
def send_email(self, email):
try:
# Отправляем
except Exception as exc:
# Если упало — RabbitMQ сохранит в очереди и retry'т
raise self.retry(exc=exc, countdown=60)
3. Масштабируемость
1 приложение → 10 Workers
10 приложений → 100 Workers
Все работают параллельно через один Broker
4. Приоритеты задач
# Важное письмо обрабатывается в первую очередь
send_email.apply_async(
args=['vip@example.com'],
priority=9 # Высокий приоритет
)
# Рассылка обрабатывается в конце
send_email.apply_async(
args=['newsletter@example.com'],
priority=1 # Низкий приоритет
)
Архитектура
┌──────────────────────────────────────────────────────┐
│ Flask Приложение │
│ send_email.delay('user@example.com') # Быстро │
└─────────────────┬──────────────────────────────────┘
│
↓
┌──────────────────────────────────────────────────────┐
│ Redis Broker (очереди) │
│ queue: [task1, task2, task3, ...] │
│ task1: {"func": "send_email", "args": [...]} │
└─────────────────┬──────────────────────────────────┘
┌───────┴───────┬───────────┐
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ Выполняет │ │ Выполняет │ │ Выполняет │
│ task1 │ │ task2 │ │ task3 │
└──────────────┘ └──────────────┘ └──────────────┘
Сравнение Redis vs RabbitMQ
| Параметр | Redis | RabbitMQ |
|---|---|---|
| Скорость | Очень быстро | Медленнее |
| Надёжность | Может потеряться | Гарантирует |
| Персистентность | С RDB/AOF | Встроенная |
| Масштабируемость | До миллионов | Миллиарды |
| Простота | Очень простая | Требует знаний |
| Нагрузка | Легкие задачи | Критические |
Когда использовать Celery
✓ Используй Celery когда:
- Нужна асинхронная обработка
- Тяжелые операции (отправка писем, обработка видео)
- Периодические задачи (cron jobs)
- Требуется масштабируемость
✗ Не используй Celery когда:
- Задачи очень лёгкие (<100мс)
- Нужен мгновенный результат
- Нет сложных операций
Заключение
Broker в Celery — это очередь задач между приложением и workers:
- Приложение отправляет задачу в Broker (быстро)
- Пользователь получает ответ мгновенно
- Worker'ы обрабатывают задачи в фоне
- Результаты сохраняются для проверки
Без Broker'а Celery вообще не имеет смысла — это просто очередь задач, которая находится между вашим приложением и worker'ами.