← Назад к вопросам
Как Celery взаимодействует с основным приложением?
2.0 Middle🔥 131 комментариев
#Асинхронность и многопоточность#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как Celery взаимодействует с основным приложением
Celery — это библиотека для асинхронной обработки задач в Python приложениях. Она работает по архитектуре producer-consumer, где основное приложение — это producer, а Celery worker — это consumer.
Архитектура взаимодействия
┌─────────────────────────┐
│ Flask/Django/FastAPI │ (Producer)
│ Основное приложение │
└────────────┬────────────┘
│ Отправляет задачу
↓
┌─────────────────────────┐
│ Message Broker │
│ (Redis/RabbitMQ) │
└────────────┬────────────┘
│ Берёт задачу
↓
┌─────────────────────────┐
│ Celery Worker │
│ Обрабатывает задачу │
└─────────────────────────┘
Пример: Flask + Celery + Redis
Шаг 1: Конфигурация Celery
# celery_app.py
from celery import Celery
from kombu import Exchange, Queue
celery_app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
)
Шаг 2: Определение задач (tasks)
# tasks.py
from celery_app import celery_app
import time
@celery_app.task(name='send_email')
def send_email(email: str, message: str) -> dict:
"""Отправка email асинхронно"""
print(f"Отправляю письмо на {email}")
time.sleep(2) # Имитация работы
return {'status': 'sent', 'email': email}
@celery_app.task(name='process_image')
def process_image(image_path: str) -> dict:
"""Обработка изображения"""
print(f"Обрабатываю изображение {image_path}")
# Компрессия, ресайз и т.д.
return {'status': 'processed', 'path': image_path}
Шаг 3: Отправка задач из основного приложения
# app.py
from flask import Flask, request
from tasks import send_email, process_image
app = Flask(__name__)
@app.route('/register', methods=['POST'])
def register():
data = request.json
email = data['email']
# Отправляем задачу асинхронно
task = send_email.delay(email, "Welcome!")
# Возвращаем результат сразу, не ждём отправки email
return {'status': 'ok', 'task_id': task.id}, 200
@app.route('/upload-image', methods=['POST'])
def upload_image():
file = request.files['image']
file_path = f"/uploads/{file.filename}"
file.save(file_path)
# Запускаем обработку асинхронно
task = process_image.delay(file_path)
return {'task_id': task.id}, 202
@app.route('/task-status/<task_id>', methods=['GET'])
def get_task_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id)
return {
'task_id': task_id,
'status': task.status,
'result': task.result if task.ready() else None
}
Тип обновления результата (Result Backend)
Celery хранит результаты выполнения задач в Result Backend:
# Получение результата
from celery.result import AsyncResult
task = AsyncResult('task_id_here')
if task.ready():
print(f"Результат: {task.result}")
else:
print(f"Статус: {task.status}")
# Ожидание результата (блокирующий вызов)
result = task.get(timeout=30)
Запуск Celery Worker
# Запуск одного worker'а
celery -A tasks worker --loglevel=info
# Запуск с несколькими процессами
celery -A tasks worker --loglevel=info -c 4
# С указанием очередей
celery -A tasks worker -Q default,urgent --loglevel=info
Типы Celery задач
1. Простые задачи
@celery_app.task
def add(x, y):
return x + y
result = add.delay(2, 3)
2. Задачи с повторами
@celery_app.task(autoretry_for=(Exception,), max_retries=3)
def risky_operation():
# Будет повторена 3 раза при ошибке
pass
3. Периодические задачи (Celery Beat)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'tasks.cleanup',
'schedule': crontab(minute=0),
},
}
@celery_app.task
def cleanup():
print("Очищаю старые данные")
Основные компоненты
- Producer — основное приложение, которое создаёт задачи
- Broker — очередь сообщений (Redis/RabbitMQ)
- Worker — процесс, выполняющий задачи
- Result Backend — хранилище результатов
- Flower — веб-интерфейс для мониторинга
Заключение
Celery позволяет основному приложению отправлять долгоиграющие операции на выполнение в фоне, не блокируя обработку клиентского запроса. Это критически важно для масштабирования приложений и обеспечения хорошей user experience.