Что такое Celery и для чего он используется?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Celery и для чего он используется?
Celery — это распределенная система обработки задач (distributed task queue) на Python. Она позволяет асинхронно выполнять длительные операции за пределами основного потока приложения, используя очереди задач (message broker) и worker'ов.
Основная концепция
Celery работает по схеме "производитель-потребитель":
- Producer — основное приложение ставит задачу в очередь
- Broker — хранилище очереди (Redis, RabbitMQ)
- Worker — отдельный процесс, который берет задачу и выполняет её
- Result Backend — где хранятся результаты выполнения
┌─────────────────────────────────────────────┐
│ Django/Flask Application (Producer) │
│ task.delay(arg1, arg2) ───────────────┐ │
└─────────────────────────────────────────┼───┘
│
┌─────────────────────▼────────────┐
│ Message Broker (Redis/RabbitMQ) │
│ Task Queue │
└─────────────────────┬────────────┘
│
┌───────────────────────────────┼───────────────────────────────┐
│ │ │
┌─────▼──────┐ ┌─────▼──────┐ ┌─────▼──────┐
│ Worker 1 │ │ Worker 2 │ │ Worker 3 │
│ (Process) │ │ (Process) │ │ (Process) │
└──────────────┘ └──────────────┘ └──────────────┘
Установка и базовая настройка
# Установка
pip install celery redis
# или для RabbitMQ
pip install celery[amqp]
Простой пример
# celery_app.py
from celery import Celery
# Создание приложения Celery
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
# Конфигурация
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 минут жесткий лимит
)
# Определение задачи
@app.task(bind=True, max_retries=3)
def send_email(self, email, subject, message):
try:
# Долгая операция
print(f"Отправляю письмо на {email}")
# Имитация отправки
import time
time.sleep(2)
return f"Письмо отправлено на {email}"
except Exception as exc:
# Автоматический retry с exponential backoff
raise self.retry(exc=exc, countdown=60)
@app.task
def process_video(video_id):
"""Обработка видео (долгая операция)"""
print(f"Обрабатываю видео {video_id}")
# Долгие вычисления
return f"Видео {video_id} обработано"
@app.task
def analyze_data(dataset):
"""Анализ данных"""
return sum(dataset) / len(dataset) # вычисление среднего
Использование в Flask/Django приложении
# main.py - Flask приложение
from flask import Flask, jsonify
from celery_app import send_email, process_video
app = Flask(__name__)
@app.route('/send-email', methods=['POST'])
def send_email_endpoint():
# Асинхронно ставим задачу в очередь
task = send_email.delay(
'user@example.com',
'Hello',
'This is a test email'
)
return jsonify({
'task_id': task.id,
'status': 'Task queued'
})
@app.route('/task-status/<task_id>', methods=['GET'])
def get_task_status(task_id):
from celery.result import AsyncResult
result = AsyncResult(task_id)
return jsonify({
'task_id': task_id,
'status': result.status,
'result': result.result if result.ready() else None
})
@app.route('/process-video/<video_id>', methods=['POST'])
def start_video_processing(video_id):
# Ставим задачу в очередь
task = process_video.delay(video_id)
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(debug=True)
Запуск Worker'ов
# Один worker на всю машину
celery -A celery_app worker --loglevel=info
# Несколько worker'ов (процессов)
celery -A celery_app worker --loglevel=info --concurrency=4
# Определенный worker для определенных задач
celery -A celery_app worker --loglevel=info --queues=email
# Worker с автоскейлингом
celery -A celery_app worker --loglevel=info --autoscale=10,3
Продвинутые возможности
from celery import group, chain, chord
# 1. Задача с задержкой
send_email.apply_async(
('user@example.com', 'Hello', 'Message'),
countdown=60 # выполнить через 60 секунд
)
# 2. Периодические задачи (cron-like)
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-summary-every-morning': {
'task': 'tasks.send_summary_email',
'schedule': crontab(hour=8, minute=0),
},
'cleanup-every-hour': {
'task': 'tasks.cleanup_temp_files',
'schedule': 3600.0, # каждый час
},
}
# 3. Групповое выполнение (параллель)
from celery import group
def send_emails_to_all_users(user_emails):
# Все задачи выполняются параллельно
job = group(send_email.s(email, 'Hello', 'Message') for email in user_emails)
result = job.apply_async()
return result.id
# 4. Цепочка задач (последовательно)
from celery import chain
def workflow():
# Сначала task1, потом task2(результат от task1), потом task3
workflow = chain(
process_video.s('video1'),
analyze_data.s([1, 2, 3, 4, 5]),
send_email.s('admin@example.com', 'Results', message)
)
workflow.apply_async()
# 5. Обработка результатов (chord)
from celery import chord
def parallel_with_callback():
callback = send_email.s('admin@example.com', 'Done', 'All tasks completed')
# Выполни все задачи параллельно, потом вызови callback с результатами
header = group(process_video.s(f'video{i}') for i in range(5))
result = chord(header)(callback)
return result.id
# 6. Retry с экспоненциальной задержкой
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5})
def fetch_from_api(self, url):
try:
# попытка
import requests
return requests.get(url).json()
except Exception as exc:
# Автоматический retry с задержкой
countdown = 2 ** self.request.retries # 1, 2, 4, 8, 16 сек
raise self.retry(exc=exc, countdown=countdown)
# 7. Мониторинг выполнения задачи
from celery.result import AsyncResult
task = process_video.delay('video123')
print(f"Task ID: {task.id}")
print(f"Status: {task.status}") # PENDING, STARTED, SUCCESS, FAILURE
# Ожидание результата
result = task.get(timeout=300) # ждем до 5 минут
print(f"Result: {result}")
Основные use-cases Celery
- Отправка писем — не блокируем HTTP запрос
- Обработка видео/изображений — долгие вычисления
- Веб-скрапинг — много HTTP запросов
- Генерация отчетов — долгая обработка данных
- Очистка базы данных — периодические задачи
- Интеграция с внешними API — с retry и обработкой ошибок
- Индексирование в поисковых системах — асинхронно
Плюсы Celery
✅ Асинхронное выполнение задач ✅ Распределенная обработка (масштабируемость) ✅ Автоматические retry'и ✅ Поддержка периодических задач ✅ Интеграция с Django, Flask и другими ✅ Гибкая маршрутизация задач ✅ Поддержка разных broker'ов и backend'ов
Минусы Celery
❌ Сложность настройки и отладки ❌ Требует отдельного сервиса (broker) ❌ Overhead при отправке задач ❌ Проблемы с отследованием ошибок ❌ Нужен мониторинг (Flower)
Альтернативы
- RQ (Redis Queue) — проще, для простых случаев
- Dramatiq — современная альтернатива
- APScheduler — для периодических задач
- Huey — легче Celery
- Встроенные решения (AWS SQS, Google Cloud Tasks)
Резюме
Celery — это мощный инструмент для асинхронной обработки задач в Python приложениях. Он идеален для приложений, которые должны быстро реагировать на запросы, но требуют долгих фоновых операций. Правильное использование Celery значительно улучшает масштабируемость и надежность приложения.