← Назад к вопросам

Что такое Celery и для чего он используется?

2.3 Middle🔥 111 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое Celery и для чего он используется?

Celery — это распределенная система обработки задач (distributed task queue) на Python. Она позволяет асинхронно выполнять длительные операции за пределами основного потока приложения, используя очереди задач (message broker) и worker'ов.

Основная концепция

Celery работает по схеме "производитель-потребитель":

  1. Producer — основное приложение ставит задачу в очередь
  2. Broker — хранилище очереди (Redis, RabbitMQ)
  3. Worker — отдельный процесс, который берет задачу и выполняет её
  4. Result Backend — где хранятся результаты выполнения
┌─────────────────────────────────────────────┐
│ Django/Flask Application (Producer)         │
│  task.delay(arg1, arg2)  ───────────────┐   │
└─────────────────────────────────────────┼───┘
                                          │
                    ┌─────────────────────▼────────────┐
                    │  Message Broker (Redis/RabbitMQ)  │
                    │  Task Queue                        │
                    └─────────────────────┬────────────┘
                                          │
          ┌───────────────────────────────┼───────────────────────────────┐
          │                               │                               │
    ┌─────▼──────┐                  ┌─────▼──────┐                  ┌─────▼──────┐
    │   Worker 1  │                  │   Worker 2  │                  │   Worker 3  │
    │ (Process)   │                  │ (Process)   │                  │ (Process)   │
    └──────────────┘                  └──────────────┘                  └──────────────┘

Установка и базовая настройка

# Установка
pip install celery redis

# или для RabbitMQ
pip install celery[amqp]

Простой пример

# celery_app.py
from celery import Celery

# Создание приложения Celery
app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# Конфигурация
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 минут жесткий лимит
)

# Определение задачи
@app.task(bind=True, max_retries=3)
def send_email(self, email, subject, message):
    try:
        # Долгая операция
        print(f"Отправляю письмо на {email}")
        # Имитация отправки
        import time
        time.sleep(2)
        return f"Письмо отправлено на {email}"
    except Exception as exc:
        # Автоматический retry с exponential backoff
        raise self.retry(exc=exc, countdown=60)

@app.task
def process_video(video_id):
    """Обработка видео (долгая операция)"""
    print(f"Обрабатываю видео {video_id}")
    # Долгие вычисления
    return f"Видео {video_id} обработано"

@app.task
def analyze_data(dataset):
    """Анализ данных"""
    return sum(dataset) / len(dataset)  # вычисление среднего

Использование в Flask/Django приложении

# main.py - Flask приложение
from flask import Flask, jsonify
from celery_app import send_email, process_video

app = Flask(__name__)

@app.route('/send-email', methods=['POST'])
def send_email_endpoint():
    # Асинхронно ставим задачу в очередь
    task = send_email.delay(
        'user@example.com',
        'Hello',
        'This is a test email'
    )
    
    return jsonify({
        'task_id': task.id,
        'status': 'Task queued'
    })

@app.route('/task-status/<task_id>', methods=['GET'])
def get_task_status(task_id):
    from celery.result import AsyncResult
    
    result = AsyncResult(task_id)
    
    return jsonify({
        'task_id': task_id,
        'status': result.status,
        'result': result.result if result.ready() else None
    })

@app.route('/process-video/<video_id>', methods=['POST'])
def start_video_processing(video_id):
    # Ставим задачу в очередь
    task = process_video.delay(video_id)
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(debug=True)

Запуск Worker'ов

# Один worker на всю машину
celery -A celery_app worker --loglevel=info

# Несколько worker'ов (процессов)
celery -A celery_app worker --loglevel=info --concurrency=4

# Определенный worker для определенных задач
celery -A celery_app worker --loglevel=info --queues=email

# Worker с автоскейлингом
celery -A celery_app worker --loglevel=info --autoscale=10,3

Продвинутые возможности

from celery import group, chain, chord

# 1. Задача с задержкой
send_email.apply_async(
    ('user@example.com', 'Hello', 'Message'),
    countdown=60  # выполнить через 60 секунд
)

# 2. Периодические задачи (cron-like)
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send-summary-every-morning': {
        'task': 'tasks.send_summary_email',
        'schedule': crontab(hour=8, minute=0),
    },
    'cleanup-every-hour': {
        'task': 'tasks.cleanup_temp_files',
        'schedule': 3600.0,  # каждый час
    },
}

# 3. Групповое выполнение (параллель)
from celery import group

def send_emails_to_all_users(user_emails):
    # Все задачи выполняются параллельно
    job = group(send_email.s(email, 'Hello', 'Message') for email in user_emails)
    result = job.apply_async()
    return result.id

# 4. Цепочка задач (последовательно)
from celery import chain

def workflow():
    # Сначала task1, потом task2(результат от task1), потом task3
    workflow = chain(
        process_video.s('video1'),
        analyze_data.s([1, 2, 3, 4, 5]),
        send_email.s('admin@example.com', 'Results', message)
    )
    workflow.apply_async()

# 5. Обработка результатов (chord)
from celery import chord

def parallel_with_callback():
    callback = send_email.s('admin@example.com', 'Done', 'All tasks completed')
    
    # Выполни все задачи параллельно, потом вызови callback с результатами
    header = group(process_video.s(f'video{i}') for i in range(5))
    result = chord(header)(callback)
    return result.id

# 6. Retry с экспоненциальной задержкой
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5})
def fetch_from_api(self, url):
    try:
        # попытка
        import requests
        return requests.get(url).json()
    except Exception as exc:
        # Автоматический retry с задержкой
        countdown = 2 ** self.request.retries  # 1, 2, 4, 8, 16 сек
        raise self.retry(exc=exc, countdown=countdown)

# 7. Мониторинг выполнения задачи
from celery.result import AsyncResult

task = process_video.delay('video123')
print(f"Task ID: {task.id}")
print(f"Status: {task.status}")  # PENDING, STARTED, SUCCESS, FAILURE

# Ожидание результата
result = task.get(timeout=300)  # ждем до 5 минут
print(f"Result: {result}")

Основные use-cases Celery

  1. Отправка писем — не блокируем HTTP запрос
  2. Обработка видео/изображений — долгие вычисления
  3. Веб-скрапинг — много HTTP запросов
  4. Генерация отчетов — долгая обработка данных
  5. Очистка базы данных — периодические задачи
  6. Интеграция с внешними API — с retry и обработкой ошибок
  7. Индексирование в поисковых системах — асинхронно

Плюсы Celery

✅ Асинхронное выполнение задач ✅ Распределенная обработка (масштабируемость) ✅ Автоматические retry'и ✅ Поддержка периодических задач ✅ Интеграция с Django, Flask и другими ✅ Гибкая маршрутизация задач ✅ Поддержка разных broker'ов и backend'ов

Минусы Celery

❌ Сложность настройки и отладки ❌ Требует отдельного сервиса (broker) ❌ Overhead при отправке задач ❌ Проблемы с отследованием ошибок ❌ Нужен мониторинг (Flower)

Альтернативы

  • RQ (Redis Queue) — проще, для простых случаев
  • Dramatiq — современная альтернатива
  • APScheduler — для периодических задач
  • Huey — легче Celery
  • Встроенные решения (AWS SQS, Google Cloud Tasks)

Резюме

Celery — это мощный инструмент для асинхронной обработки задач в Python приложениях. Он идеален для приложений, которые должны быстро реагировать на запросы, но требуют долгих фоновых операций. Правильное использование Celery значительно улучшает масштабируемость и надежность приложения.

Что такое Celery и для чего он используется? | PrepBro