← Назад к вопросам

Зачем нужен broker в Celery?

2.0 Middle🔥 141 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Broker в Celery: зачем он нужен и как работает

Broker — это сердце Celery. Без него Celery вообще не работает. Это промежуточная система, которая соединяет ваше приложение с рабочими процессами. Давай разберёмся подробно.

Проблема без Broker

Представь архитектуру веб-приложения без очереди задач:

# ❌ БЕЗ Celery (синхронно)
from flask import Flask

app = Flask(__name__)

@app.route('/send-email')
def send_email():
    email = request.json['email']
    send_email_blocking(email)  # БЛОКИРУЕТ на 5 секунд!
    return {'status': 'sent'}

# Проблемы:
# 1. Пользователь ждёт 5 сек пока отправляется письмо
# 2. Если сервер упадёт при отправке, письмо потеряется
# 3. Масштабировать невозможно — очередь задач создаётся в памяти

Что такое Broker

Broker — это система доставки сообщений. Приложение отправляет сообщение в Broker, а рабочие процессы (Workers) слушают Broker и обрабатывают задачи.

┌─────────────┐         ┌──────────────┐         ┌──────────────┐
│ Application │ ────→  │   Broker     │  ←───   │   Workers    │
│  (Flask)    │ sends  │  (Redis/RabbitMQ)      │ (Celery)     │
└─────────────┘ task   └──────────────┘ pulls   └──────────────┘
                                        tasks

Как это работает:

  1. Приложение создаёт задачу
  2. Broker сохраняет задачу в очередь (очень быстро)
  3. Приложение сразу же возвращает ответ пользователю
  4. Worker подхватывает задачу из Broker'а
  5. Worker выполняет задачу
  6. Worker сохраняет результат (если нужен)

Типы Broker'ов

1. Redis — самый популярный

Преимущества:

  • Очень быстрый (in-memory)
  • Простой в установке
  • Хорош для небольших нагрузок
# Celery с Redis
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379/0')

@app.task
def send_email(email):
    # Отправляем письмо
    print(f"Sending email to {email}")
    return f"Email sent to {email}"

# Использование
send_email.delay('user@example.com')  # Отправляет в Redis

Недостатки:

  • Не персистентен (теряет задачи при перезагрузке)
  • RDB/AOF нужны для надёжности

2. RabbitMQ — надёжный

Преимущества:

  • Гарантирует доставку задач
  • Персистентность (задачи записываются на диск)
  • Дорогие гарантии AMQP
  • Масштабируется на миллионы задач
# Celery с RabbitMQ
from celery import Celery

app = Celery(
    'myapp',
    broker='amqp://guest:guest@localhost:5672//'
)

@app.task
def send_email(email):
    print(f"Sending email to {email}")

send_email.delay('user@example.com')  # В RabbitMQ

Недостатки:

  • Медленнее Redis
  • Сложнее в установке
  • Требует больше ресурсов

3. AWS SQS — облачный

app = Celery(
    'myapp',
    broker='sqs://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@sqs.eu-west-1.amazonaws.com/'
)

Полный пример Celery с Redis

# celery_app.py
from celery import Celery
import time

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'  # для результатов
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

@app.task(bind=True)
def send_email(self, email, subject, body):
    """Отправляет письмо асинхронно"""
    try:
        # Имитация отправки письма
        time.sleep(5)  # Тяжелая операция
        print(f"Отправлено письмо на {email}")
        return f"Email sent to {email}"
    except Exception as exc:
        # Retry с экспоненциальной задержкой
        raise self.retry(exc=exc, countdown=60)

@app.task
def process_video(video_id):
    """Обработать видео (очень тяжелая операция)"""
    print(f"Processing video {video_id}")
    # Конвертирование, обрезка, и т.д.
    return f"Video {video_id} processed"

@app.task
def periodic_cleanup():
    """Периодическая очистка"""
    print("Cleaning up...")

if __name__ == '__main__':
    app.start()
# flask_app.py
from flask import Flask, jsonify, request
from celery_app import send_email, process_video

app = Flask(__name__)

@app.route('/send-email', methods=['POST'])
def send_email_endpoint():
    data = request.json
    email = data['email']
    subject = data['subject']
    body = data['body']
    
    # Отправляем задачу в Celery (в Redis)
    # БЕЗ блокирования
    task = send_email.delay(email, subject, body)
    
    return jsonify({
        'status': 'task_submitted',
        'task_id': task.id  # Можешь потом проверить статус
    })

@app.route('/task-status/<task_id>')
def task_status(task_id):
    """Проверить статус задачи"""
    from celery_app import app as celery_app
    
    task = celery_app.AsyncResult(task_id)
    
    if task.state == 'PENDING':
        return jsonify({'status': 'pending'})
    elif task.state == 'SUCCESS':
        return jsonify({'status': 'success', 'result': task.result})
    elif task.state == 'FAILURE':
        return jsonify({'status': 'failure', 'error': str(task.info)})
    else:
        return jsonify({'status': task.state})

@app.route('/process-video', methods=['POST'])
def process_video_endpoint():
    data = request.json
    video_id = data['video_id']
    
    # Отправляем тяжелую задачу
    task = process_video.apply_async(
        args=[video_id],
        countdown=10,  # Запустить через 10 секунд
        expires=3600   # Задача истекает через 1 час
    )
    
    return jsonify({
        'status': 'processing',
        'task_id': task.id
    })

if __name__ == '__main__':
    app.run(debug=True)

Запуск Celery Worker'а

# Запустить worker (слушает Redis)
celery -A celery_app worker --loglevel=info

# Несколько workers параллельно
celery -A celery_app worker -c 4 --loglevel=info  # 4 параллельных задачи

# Worker для специфических очередей
celery -A celery_app worker -Q email,video --loglevel=info

Преимущества Broker'а

1. Асинхронность

# Без Broker: пользователь ждёт
def endpoint():
    send_email_blocking('user@example.com')  # 5 сек блокировка
    return 'sent'

# С Broker: пользователь получает ответ мгновенно
def endpoint():
    send_email.delay('user@example.com')  # Отправляется в Broker
    return 'task submitted'  # Мгновенно

2. Надёжность (с правильным Broker'ом)

# RabbitMQ гарантирует доставку
@app.task(bind=True, max_retries=3)
def send_email(self, email):
    try:
        # Отправляем
    except Exception as exc:
        # Если упало — RabbitMQ сохранит в очереди и retry'т
        raise self.retry(exc=exc, countdown=60)

3. Масштабируемость

1 приложение → 10 Workers
10 приложений → 100 Workers
Все работают параллельно через один Broker

4. Приоритеты задач

# Важное письмо обрабатывается в первую очередь
send_email.apply_async(
    args=['vip@example.com'],
    priority=9  # Высокий приоритет
)

# Рассылка обрабатывается в конце
send_email.apply_async(
    args=['newsletter@example.com'],
    priority=1  # Низкий приоритет
)

Архитектура

┌──────────────────────────────────────────────────────┐
│                  Flask Приложение                     │
│  send_email.delay('user@example.com')  # Быстро     │
└─────────────────┬──────────────────────────────────┘
                  │
                  ↓
┌──────────────────────────────────────────────────────┐
│              Redis Broker (очереди)                  │
│  queue: [task1, task2, task3, ...]                   │
│  task1: {"func": "send_email", "args": [...]}       │
└─────────────────┬──────────────────────────────────┘
          ┌───────┴───────┬───────────┐
          ↓               ↓           ↓
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│ Worker 1     │  │ Worker 2     │  │ Worker 3     │
│ Выполняет    │  │ Выполняет    │  │ Выполняет    │
│ task1        │  │ task2        │  │ task3        │
└──────────────┘  └──────────────┘  └──────────────┘

Сравнение Redis vs RabbitMQ

ПараметрRedisRabbitMQ
СкоростьОчень быстроМедленнее
НадёжностьМожет потерятьсяГарантирует
ПерсистентностьС RDB/AOFВстроенная
МасштабируемостьДо миллионовМиллиарды
ПростотаОчень простаяТребует знаний
НагрузкаЛегкие задачиКритические

Когда использовать Celery

Используй Celery когда:

  • Нужна асинхронная обработка
  • Тяжелые операции (отправка писем, обработка видео)
  • Периодические задачи (cron jobs)
  • Требуется масштабируемость

Не используй Celery когда:

  • Задачи очень лёгкие (<100мс)
  • Нужен мгновенный результат
  • Нет сложных операций

Заключение

Broker в Celery — это очередь задач между приложением и workers:

  1. Приложение отправляет задачу в Broker (быстро)
  2. Пользователь получает ответ мгновенно
  3. Worker'ы обрабатывают задачи в фоне
  4. Результаты сохраняются для проверки

Без Broker'а Celery вообще не имеет смысла — это просто очередь задач, которая находится между вашим приложением и worker'ами.