← Назад к вопросам
Как интегрировать Celery в Django?
1.8 Middle🔥 221 комментариев
#Django#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Интеграция Celery в Django
Celery — это распределённая система для обработки асинхронных задач. Это один из самых важных инструментов для Django разработчиков, когда нужно обрабатывать длительные операции без блокирования пользователя.
Архитектура Celery
Django приложение → Celery Task Queue → Message Broker → Worker → Result Backend
(Producer) (Отправление) (RabbitMQ/Redis) (Обработка) (Хранение результата)
Шаг 1: Установка зависимостей
pip install celery
pip install redis # или django-celery-beat, django-celery-results
pip install kombu # Для поддержки различных message brokers
Шаг 2: Конфигурация Celery
Создание файла celery.py в папке проекта
# myproject/celery.py
import os
from celery import Celery
# Устанавливаем дефолтный Django settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Загружаем конфигурацию из Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')
# Автоматически обнаруживаем tasks из всех установленных приложений
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Инициализация в init.py
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ['celery_app']
Шаг 3: Конфигурация в settings.py
# myproject/settings.py
# Broker configuration (Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Результаты хранятся в Redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# Таймзона для задач
CELERY_TIMEZONE = 'UTC'
# Заканчивать ли уже выполняемую задачу при отключении worker
CELERY_TASK_ACKS_LATE = True
# Максимально количество попыток переделать задачу
CELERY_TASK_MAX_RETRIES = 3
# Временное ограничение для задачи (в секундах)
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 минут
# Мягкое ограничение времени (задача будет прервана)
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60
# Настройки для результатов
CELERY_RESULT_EXPIRES = 3600 # Результаты хранятся 1 час
# Параллелизм - количество одновременных работников
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Шаг 4: Определение задач
Создание tasks.py в приложении
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth.models import User
import logging
logger = logging.getLogger(__name__)
# Простая задача
@shared_task
def send_welcome_email(user_id):
"""Отправить приветственное письмо пользователю"""
try:
user = User.objects.get(id=user_id)
send_mail(
'Добро пожаловать!',
'Спасибо за регистрацию',
'from@example.com',
[user.email],
fail_silently=False,
)
except User.DoesNotExist:
logger.error(f'User with id {user_id} not found')
# Задача с повторными попытками
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, order_id):
"""Обработать платеж"""
try:
order = Order.objects.get(id=order_id)
# Логика обработки платежа
payment_gateway.process(order)
order.status = 'paid'
order.save()
return f'Payment processed for order {order_id}'
except Exception as exc:
# Повторяем через 60 секунд
logger.error(f'Payment failed: {exc}')
raise self.retry(exc=exc, countdown=60)
# Периодическая задача (требует celery-beat)
@shared_task
def cleanup_expired_sessions():
"""Очистить истекшие сессии каждый час"""
from django.contrib.sessions.models import Session
from django.utils import timezone
Session.objects.filter(expire_date__lt=timezone.now()).delete()
# Задача с специальными параметрами
@shared_task(
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 5},
default_retry_delay=5,
rate_limit='100/m' # Максимум 100 в минуту
)
def process_large_file(file_path):
"""Обработать большой файл"""
with open(file_path, 'r') as f:
for line in f:
# Обработка
pass
return f'Processed {file_path}'
# Chaining tasks
@shared_task
def generate_report(report_id):
report = Report.objects.get(id=report_id)
# Генерируем
return report.id
@shared_task
def send_report(report_id):
report = Report.objects.get(id=report_id)
# Отправляем
return f'Report {report_id} sent'
Шаг 5: Использование задач в Django views
# myapp/views.py
from django.shortcuts import render
from django.http import JsonResponse
from .tasks import send_welcome_email, process_payment
from .models import Order
def register_user(request):
"""Регистрация пользователя"""
# Обычная логика регистрации
user = User.objects.create_user(...)
# Запуск асинхронной задачи
send_welcome_email.delay(user.id) # delay отправляет в очередь
return JsonResponse({'status': 'User registered'})
def create_order(request):
"""Создать заказ"""
order = Order.objects.create(...)
# Запуск с временной задержкой (через 5 минут)
process_payment.apply_async(
args=[order.id],
countdown=300 # 5 минут в секундах
)
return JsonResponse({'order_id': order.id})
def check_task_status(request, task_id):
"""Проверить статус задачи"""
from celery.result import AsyncResult
task = AsyncResult(task_id)
return JsonResponse({
'status': task.status,
'result': task.result if task.successful() else None,
'error': str(task.info) if task.failed() else None
})
Шаг 6: Запуск Celery Worker
# Базовый worker
celery -A myproject worker -l info
# Worker с несколькими потоками
celery -A myproject worker -l info -c 4
# Worker с пулом процессов (для CPU-bound задач)
celery -A myproject worker -l info -P solo
# Для production с supervisor
[program:celery]
command=celery -A myproject worker -l info
directory=/path/to/project
user=www-data
autostart=true
autorestart=true
Шаг 7: Периодические задачи с Celery Beat
pip install django-celery-beat
# settings.py
INSTALLED_APPS = [
'django_celery_beat',
]
# Задачи в settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-sessions-every-hour': {
'task': 'myapp.tasks.cleanup_expired_sessions',
'schedule': crontab(minute=0), # Каждый час
},
'generate-reports-daily': {
'task': 'myapp.tasks.generate_daily_report',
'schedule': crontab(hour=23, minute=59), # В 23:59
},
}
# Запуск Celery Beat scheduler
celery -A myproject beat -l info
Шаг 8: Мониторинг
# Установка flower для веб-интерфейса мониторинга
pip install flower
# Запуск
celery -A myproject flower
# Доступно на http://localhost:5555
Лучшие практики
# 1. Используй именованные параметры
@shared_task
def good_task(*, user_id, email): # * заставляет использовать ключевые аргументы
pass
# 2. Валидация входных данных
@shared_task
def validate_task(user_id: int):
if user_id <= 0:
raise ValueError('Invalid user_id')
user = User.objects.get(id=user_id)
# 3. Логирование
import logging
logger = logging.getLogger(__name__)
@shared_task
def logged_task(data):
logger.info(f'Starting task with data: {data}')
try:
# Работа
logger.info('Task completed')
except Exception as e:
logger.error(f'Task failed: {e}')
raise
# 4. Избегай передачи сложных объектов
# ❌ Плохо
send_task.delay(user) # Объект User сериализуется
# ✅ Хорошо
send_task.delay(user.id) # Только ID
Заключение
Celery интеграция в Django включает:
- Установка и конфигурация Celery
- Определение задач с декоратором @shared_task
- Отправка задач через .delay() или .apply_async()
- Запуск worker процессов
- Мониторинг через Flower
- Использование Beat для периодических задач
Это позволяет строить масштабируемые Django приложения с асинхронной обработкой.