← Назад к вопросам
Каким образом работал с фоновыми задачами в Django?
2.0 Middle🔥 191 комментариев
#Django#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Фоновые задачи в Django: практический опыт и best practices
Фоновые задачи (background jobs) - критичны для production приложений. Разберём как я работал с ними и какие решения рекомендую.
1. Celery - король фоновых задач
Использую Celery с Redis как message broker и PostgreSQL для результатов.
# celery.py - конфигурация
from celery import Celery
from celery.schedules import crontab
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 минут
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
# Периодические задачи
CELERY_BEAT_SCHEDULE = {
'check-status-every-10-seconds': {
'task': 'myapp.tasks.check_order_status',
'schedule': 10.0,
},
'daily-report': {
'task': 'myapp.tasks.send_daily_report',
'schedule': crontab(hour=0, minute=0),
},
'cleanup-old-sessions': {
'task': 'django.core.management.call_command',
'args': ('clearsessions',),
'schedule': crontab(hour='*/6'),
},
}
2. Определение задач
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from myapp.models import Order, User
import logging
logger = logging.getLogger(__name__)
# Простая задача
@shared_task(bind=True, max_retries=3)
def send_order_confirmation(self, order_id):
"""Отправить подтверждение заказа по email"""
try:
order = Order.objects.get(id=order_id)
send_mail(
subject='Order Confirmation',
message=f'Your order {order.id} is confirmed',
from_email='noreply@example.com',
recipient_list=[order.user.email],
)
logger.info(f"Confirmation sent for order {order_id}")
except Order.DoesNotExist:
logger.error(f"Order {order_id} not found")
except Exception as exc:
logger.error(f"Error sending confirmation: {exc}")
# Retry с exponential backoff
raise self.retry(exc=exc, countdown=60)
# Задача с priority
@shared_task(bind=True, priority=9) # High priority
def process_payment(self, payment_id):
"""Обработать платёж (high priority)"""
try:
payment = Payment.objects.select_for_update().get(id=payment_id)
if payment.status == 'pending':
result = charge_credit_card(payment)
payment.status = 'completed' if result else 'failed'
payment.save()
except Exception as exc:
raise self.retry(exc=exc, countdown=300, max_retries=5)
# Задача со сложной логикой
@shared_task(bind=True, rate_limit='100/m') # 100 тасок в минуту максимум
def generate_report(self, report_id):
"""Генерировать отчёт (может быть долгим)"""
from celery_progress.backend import ProgressRecorder
report = Report.objects.get(id=report_id)
progress_recorder = ProgressRecorder(self)
try:
total_items = report.count_items()
for i, item in enumerate(report.get_items()):
# Обновляем прогресс
progress = (i + 1) / total_items * 100
progress_recorder.set_progress(i + 1, total_items, description='Generating...')
process_item(item, report)
report.status = 'completed'
report.save()
logger.info(f"Report {report_id} generated successfully")
except Exception as exc:
report.status = 'failed'
report.error = str(exc)
report.save()
raise
# Задача которая вызывает другие задачи
@shared_task
def process_batch_orders(batch_id):
"""Обработать batch заказов (вызывает другие задачи)"""
orders = Order.objects.filter(batch_id=batch_id)
# Способ 1: Sequential
for order in orders:
send_order_confirmation.delay(order.id)
# Способ 2: Parallel
from celery import group
job = group([send_order_confirmation.s(order.id) for order in orders])
job.apply_async()
# Способ 3: С зависимостями (pipeline)
from celery import chain
workflow = chain(
validate_orders.s(batch_id),
process_payments.s(),
send_confirmations.s(),
generate_invoice.s()
)
workflow.apply_async()
3. Запуск задач из views
# views.py
from django.views import View
from django.http import JsonResponse
from myapp.tasks import send_order_confirmation, generate_report
class CreateOrderView(View):
def post(self, request):
order = Order.objects.create(**request.POST)
# Способ 1: Fire and forget (не ждём результата)
send_order_confirmation.delay(order.id)
# Способ 2: С таймаутом (задержка)
send_order_confirmation.apply_async(
args=(order.id,),
countdown=300 # Выполнить через 5 минут
)
# Способ 3: С приоритетом очереди
send_order_confirmation.apply_async(
args=(order.id,),
queue='emails', # Отправляем в специальную очередь
priority=10
)
return JsonResponse({'order_id': order.id})
class GenerateReportView(View):
def post(self, request):
report = Report.objects.create()
# Запускаем фоновую задачу и возвращаем task_id
task = generate_report.apply_async(args=(report.id,))
return JsonResponse({
'report_id': report.id,
'task_id': task.id # Клиент может проверять статус
})
class CheckProgressView(View):
def get(self, request, task_id):
from celery.result import AsyncResult
task_result = AsyncResult(task_id)
return JsonResponse({
'task_id': task_id,
'status': task_result.status,
'result': task_result.result if task_result.successful() else None,
'progress': task_result.info if task_result.status == 'PROGRESS' else None
})
4. Обработка ошибок и retries
# tasks.py - правильная обработка ошибок
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import logging
from requests.exceptions import RequestException
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 5, 'countdown': 60},
default_retry_delay=60,
time_limit=300,
soft_time_limit=280
)
def call_external_api(self, data):
"""Вызвать внешнее API с автоматическими retries"""
try:
response = requests.post('https://api.example.com/process', json=data, timeout=10)
response.raise_for_status()
return response.json()
except SoftTimeLimitExceeded:
# Мягкий лимит времени - есть возможность очиститься
logger.warning(f"Task {self.request.id} exceeded soft time limit")
raise
except RequestException as exc:
# Сетевые ошибки - попробуем позже
logger.warning(f"API call failed: {exc}, retrying...")
# Exponential backoff: 60s, 120s, 240s, 480s, 960s
countdown = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=min(countdown, 3600))
except Exception as exc:
# Неожиданная ошибка
logger.error(f"Unexpected error: {exc}", exc_info=True)
raise
# Задача с custom retry logic
@shared_task
def process_with_smart_retry(task_id, attempt=0):
"""Умные retries на основе состояния"""
from myapp.models import Task
import random
try:
task = Task.objects.get(id=task_id)
result = do_work(task)
task.status = 'completed'
task.save()
return result
except TemporaryError as exc:
# Временная ошибка - повторяем с задержкой
if attempt < 5:
delay = 60 + random.randint(0, 30) # Jitter
process_with_smart_retry.apply_async(
args=(task_id, attempt + 1),
countdown=delay
)
else:
task = Task.objects.get(id=task_id)
task.status = 'failed'
task.error = str(exc)
task.save()
notify_admin(f"Task {task_id} failed after {attempt} attempts")
except PermanentError as exc:
# Постоянная ошибка - не повторяем
task = Task.objects.get(id=task_id)
task.status = 'failed'
task.error = str(exc)
task.save()
logger.error(f"Permanent error in task {task_id}: {exc}")
5. Мониторинг и отладка
# Проверить статус worker
import subprocess
def check_celery_workers():
# celery -A config inspect active # Активные задачи
# celery -A config inspect stats # Статистика worker'ов
# celery -A config events # Real-time события
pass
# Логирование
from celery.signals import task_prerun, task_postrun, task_failure
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
logger.info(f"Starting task {task.name} [{task_id}]")
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, result=None, **extra):
logger.info(f"Completed task {task.name} [{task_id}] in {extra['runtime']:.2f}s")
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
logger.error(f"Task {sender.name} [{task_id}] failed: {exception}")
# Flower - веб-интерфейс для мониторинга
# celery -A config flower
# Открыть localhost:5555
6. Альтернативы Celery
RQ (Redis Queue) - проще
from django_rq import job
from django_rq import enqueue
@job
def send_email(to_address, subject, message):
send_mail(subject, message, 'noreply@example.com', [to_address])
# Использование
from django.shortcuts import enqueue
enqueue(send_email, 'user@example.com', 'Hi', 'Hello world')
APScheduler - для расписания
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
@scheduler.scheduled_job('interval', minutes=10)
def cleanup_old_files():
# Выполняется каждые 10 минут
pass
scheduler.start()
Django-Q - легче чем Celery
from django_q.tasks import async_task
async_task('myapp.tasks.send_email', user_id, 'subject')
7. Production setup
# workers могут быть на разных машинах
celery -A config worker -l info -c 4 -Q default,email,reports
celery -A config worker -l info -c 2 -Q critical
celery -A config beat -l info # Для периодических задач
# Docker Compose
version: '3.8'
services:
redis:
image: redis:7
postgres:
image: postgres:15
environment:
POSTGRES_PASSWORD: postgres
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
depends_on:
- postgres
- redis
celery-worker:
build: .
command: celery -A config worker -l info -c 4
depends_on:
- postgres
- redis
celery-beat:
build: .
command: celery -A config beat -l info
depends_on:
- postgres
- redis
flower:
build: .
command: celery -A config flower
ports:
- "5555:5555"
depends_on:
- celery-worker
- redis
8. Best Practices
- Идемпотентность: Задача должна дать одинаковый результат даже если выполнится 2 раза
- Логирование: Логируй всё - помогает при отладке
- Таймауты: Всегда устанавливай time_limit и soft_time_limit
- Мониторинг: Используй Flower или Sentry
- Dead Letter Queue: Для задач которые не смогли выполниться
- Разные очереди: Для разных типов задач (email, reports, critical)
- Батчинг: Группируй однотипные задачи вместе
- Комплексные задачи: Использи chain, group, chord для сложных workflows
Фоновые задачи - это не опция, а требование для любого production приложения.