← Назад к вопросам

Как интегрировать Celery в Django?

1.8 Middle🔥 221 комментариев
#Django#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Интеграция Celery в Django

Celery — это распределённая система для обработки асинхронных задач. Это один из самых важных инструментов для Django разработчиков, когда нужно обрабатывать длительные операции без блокирования пользователя.

Архитектура Celery

Django приложение → Celery Task Queue → Message Broker → Worker → Result Backend
(Producer)         (Отправление)      (RabbitMQ/Redis) (Обработка) (Хранение результата)

Шаг 1: Установка зависимостей

pip install celery
pip install redis  # или django-celery-beat, django-celery-results
pip install kombu  # Для поддержки различных message brokers

Шаг 2: Конфигурация Celery

Создание файла celery.py в папке проекта

# myproject/celery.py
import os
from celery import Celery

# Устанавливаем дефолтный Django settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Загружаем конфигурацию из Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')

# Автоматически обнаруживаем tasks из всех установленных приложений
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Инициализация в init.py

# myproject/__init__.py
from .celery import app as celery_app

__all__ = ['celery_app']

Шаг 3: Конфигурация в settings.py

# myproject/settings.py

# Broker configuration (Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Результаты хранятся в Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# Таймзона для задач
CELERY_TIMEZONE = 'UTC'

# Заканчивать ли уже выполняемую задачу при отключении worker
CELERY_TASK_ACKS_LATE = True

# Максимально количество попыток переделать задачу
CELERY_TASK_MAX_RETRIES = 3

# Временное ограничение для задачи (в секундах)
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 минут

# Мягкое ограничение времени (задача будет прервана)
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60

# Настройки для результатов
CELERY_RESULT_EXPIRES = 3600  # Результаты хранятся 1 час

# Параллелизм - количество одновременных работников
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Шаг 4: Определение задач

Создание tasks.py в приложении

# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
import logging

logger = logging.getLogger(__name__)

# Простая задача
@shared_task
def send_welcome_email(user_id):
    """Отправить приветственное письмо пользователю"""
    try:
        user = User.objects.get(id=user_id)
        send_mail(
            'Добро пожаловать!',
            'Спасибо за регистрацию',
            'from@example.com',
            [user.email],
            fail_silently=False,
        )
    except User.DoesNotExist:
        logger.error(f'User with id {user_id} not found')

# Задача с повторными попытками
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, order_id):
    """Обработать платеж"""
    try:
        order = Order.objects.get(id=order_id)
        # Логика обработки платежа
        payment_gateway.process(order)
        order.status = 'paid'
        order.save()
        return f'Payment processed for order {order_id}'
    except Exception as exc:
        # Повторяем через 60 секунд
        logger.error(f'Payment failed: {exc}')
        raise self.retry(exc=exc, countdown=60)

# Периодическая задача (требует celery-beat)
@shared_task
def cleanup_expired_sessions():
    """Очистить истекшие сессии каждый час"""
    from django.contrib.sessions.models import Session
    from django.utils import timezone
    Session.objects.filter(expire_date__lt=timezone.now()).delete()

# Задача с специальными параметрами
@shared_task(
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 5},
    default_retry_delay=5,
    rate_limit='100/m'  # Максимум 100 в минуту
)
def process_large_file(file_path):
    """Обработать большой файл"""
    with open(file_path, 'r') as f:
        for line in f:
            # Обработка
            pass
    return f'Processed {file_path}'

# Chaining tasks
@shared_task
def generate_report(report_id):
    report = Report.objects.get(id=report_id)
    # Генерируем
    return report.id

@shared_task
def send_report(report_id):
    report = Report.objects.get(id=report_id)
    # Отправляем
    return f'Report {report_id} sent'

Шаг 5: Использование задач в Django views

# myapp/views.py
from django.shortcuts import render
from django.http import JsonResponse
from .tasks import send_welcome_email, process_payment
from .models import Order

def register_user(request):
    """Регистрация пользователя"""
    # Обычная логика регистрации
    user = User.objects.create_user(...)
    
    # Запуск асинхронной задачи
    send_welcome_email.delay(user.id)  # delay отправляет в очередь
    
    return JsonResponse({'status': 'User registered'})

def create_order(request):
    """Создать заказ"""
    order = Order.objects.create(...)
    
    # Запуск с временной задержкой (через 5 минут)
    process_payment.apply_async(
        args=[order.id],
        countdown=300  # 5 минут в секундах
    )
    
    return JsonResponse({'order_id': order.id})

def check_task_status(request, task_id):
    """Проверить статус задачи"""
    from celery.result import AsyncResult
    
    task = AsyncResult(task_id)
    return JsonResponse({
        'status': task.status,
        'result': task.result if task.successful() else None,
        'error': str(task.info) if task.failed() else None
    })

Шаг 6: Запуск Celery Worker

# Базовый worker
celery -A myproject worker -l info

# Worker с несколькими потоками
celery -A myproject worker -l info -c 4

# Worker с пулом процессов (для CPU-bound задач)
celery -A myproject worker -l info -P solo

# Для production с supervisor
[program:celery]
command=celery -A myproject worker -l info
directory=/path/to/project
user=www-data
autostart=true
autorestart=true

Шаг 7: Периодические задачи с Celery Beat

pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
    'django_celery_beat',
]

# Задачи в settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'cleanup-sessions-every-hour': {
        'task': 'myapp.tasks.cleanup_expired_sessions',
        'schedule': crontab(minute=0),  # Каждый час
    },
    'generate-reports-daily': {
        'task': 'myapp.tasks.generate_daily_report',
        'schedule': crontab(hour=23, minute=59),  # В 23:59
    },
}
# Запуск Celery Beat scheduler
celery -A myproject beat -l info

Шаг 8: Мониторинг

# Установка flower для веб-интерфейса мониторинга
pip install flower

# Запуск
celery -A myproject flower
# Доступно на http://localhost:5555

Лучшие практики

# 1. Используй именованные параметры
@shared_task
def good_task(*, user_id, email):  # * заставляет использовать ключевые аргументы
    pass

# 2. Валидация входных данных
@shared_task
def validate_task(user_id: int):
    if user_id <= 0:
        raise ValueError('Invalid user_id')
    user = User.objects.get(id=user_id)

# 3. Логирование
import logging
logger = logging.getLogger(__name__)

@shared_task
def logged_task(data):
    logger.info(f'Starting task with data: {data}')
    try:
        # Работа
        logger.info('Task completed')
    except Exception as e:
        logger.error(f'Task failed: {e}')
        raise

# 4. Избегай передачи сложных объектов
# ❌ Плохо
send_task.delay(user)  # Объект User сериализуется

# ✅ Хорошо
send_task.delay(user.id)  # Только ID

Заключение

Celery интеграция в Django включает:

  1. Установка и конфигурация Celery
  2. Определение задач с декоратором @shared_task
  3. Отправка задач через .delay() или .apply_async()
  4. Запуск worker процессов
  5. Мониторинг через Flower
  6. Использование Beat для периодических задач

Это позволяет строить масштабируемые Django приложения с асинхронной обработкой.

Как интегрировать Celery в Django? | PrepBro