← Назад к вопросам

Как Celery взаимодействует с основным приложением?

2.0 Middle🔥 131 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как Celery взаимодействует с основным приложением

Celery — это библиотека для асинхронной обработки задач в Python приложениях. Она работает по архитектуре producer-consumer, где основное приложение — это producer, а Celery worker — это consumer.

Архитектура взаимодействия

┌─────────────────────────┐
│  Flask/Django/FastAPI   │ (Producer)
│   Основное приложение   │
└────────────┬────────────┘
             │ Отправляет задачу
             ↓
┌─────────────────────────┐
│  Message Broker         │
│  (Redis/RabbitMQ)       │
└────────────┬────────────┘
             │ Берёт задачу
             ↓
┌─────────────────────────┐
│  Celery Worker          │
│  Обрабатывает задачу    │
└─────────────────────────┘

Пример: Flask + Celery + Redis

Шаг 1: Конфигурация Celery

# celery_app.py
from celery import Celery
from kombu import Exchange, Queue

celery_app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Шаг 2: Определение задач (tasks)

# tasks.py
from celery_app import celery_app
import time

@celery_app.task(name='send_email')
def send_email(email: str, message: str) -> dict:
    """Отправка email асинхронно"""
    print(f"Отправляю письмо на {email}")
    time.sleep(2)  # Имитация работы
    return {'status': 'sent', 'email': email}

@celery_app.task(name='process_image')
def process_image(image_path: str) -> dict:
    """Обработка изображения"""
    print(f"Обрабатываю изображение {image_path}")
    # Компрессия, ресайз и т.д.
    return {'status': 'processed', 'path': image_path}

Шаг 3: Отправка задач из основного приложения

# app.py
from flask import Flask, request
from tasks import send_email, process_image

app = Flask(__name__)

@app.route('/register', methods=['POST'])
def register():
    data = request.json
    email = data['email']
    
    # Отправляем задачу асинхронно
    task = send_email.delay(email, "Welcome!")
    
    # Возвращаем результат сразу, не ждём отправки email
    return {'status': 'ok', 'task_id': task.id}, 200

@app.route('/upload-image', methods=['POST'])
def upload_image():
    file = request.files['image']
    file_path = f"/uploads/{file.filename}"
    file.save(file_path)
    
    # Запускаем обработку асинхронно
    task = process_image.delay(file_path)
    
    return {'task_id': task.id}, 202

@app.route('/task-status/<task_id>', methods=['GET'])
def get_task_status(task_id: str):
    from celery.result import AsyncResult
    task = AsyncResult(task_id)
    
    return {
        'task_id': task_id,
        'status': task.status,
        'result': task.result if task.ready() else None
    }

Тип обновления результата (Result Backend)

Celery хранит результаты выполнения задач в Result Backend:

# Получение результата
from celery.result import AsyncResult

task = AsyncResult('task_id_here')

if task.ready():
    print(f"Результат: {task.result}")
else:
    print(f"Статус: {task.status}")

# Ожидание результата (блокирующий вызов)
result = task.get(timeout=30)

Запуск Celery Worker

# Запуск одного worker'а
celery -A tasks worker --loglevel=info

# Запуск с несколькими процессами
celery -A tasks worker --loglevel=info -c 4

# С указанием очередей
celery -A tasks worker -Q default,urgent --loglevel=info

Типы Celery задач

1. Простые задачи

@celery_app.task
def add(x, y):
    return x + y

result = add.delay(2, 3)

2. Задачи с повторами

@celery_app.task(autoretry_for=(Exception,), max_retries=3)
def risky_operation():
    # Будет повторена 3 раза при ошибке
    pass

3. Периодические задачи (Celery Beat)

from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    'cleanup-every-hour': {
        'task': 'tasks.cleanup',
        'schedule': crontab(minute=0),
    },
}

@celery_app.task
def cleanup():
    print("Очищаю старые данные")

Основные компоненты

  1. Producer — основное приложение, которое создаёт задачи
  2. Broker — очередь сообщений (Redis/RabbitMQ)
  3. Worker — процесс, выполняющий задачи
  4. Result Backend — хранилище результатов
  5. Flower — веб-интерфейс для мониторинга

Заключение

Celery позволяет основному приложению отправлять долгоиграющие операции на выполнение в фоне, не блокируя обработку клиентского запроса. Это критически важно для масштабирования приложений и обеспечения хорошей user experience.