← Назад к вопросам

Каким образом работал с фоновыми задачами в Django?

2.0 Middle🔥 191 комментариев
#Django#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Фоновые задачи в Django: практический опыт и best practices

Фоновые задачи (background jobs) - критичны для production приложений. Разберём как я работал с ними и какие решения рекомендую.

1. Celery - король фоновых задач

Использую Celery с Redis как message broker и PostgreSQL для результатов.

# celery.py - конфигурация
from celery import Celery
from celery.schedules import crontab
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 минут
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

# Периодические задачи
CELERY_BEAT_SCHEDULE = {
    'check-status-every-10-seconds': {
        'task': 'myapp.tasks.check_order_status',
        'schedule': 10.0,
    },
    'daily-report': {
        'task': 'myapp.tasks.send_daily_report',
        'schedule': crontab(hour=0, minute=0),
    },
    'cleanup-old-sessions': {
        'task': 'django.core.management.call_command',
        'args': ('clearsessions',),
        'schedule': crontab(hour='*/6'),
    },
}

2. Определение задач

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from myapp.models import Order, User
import logging

logger = logging.getLogger(__name__)

# Простая задача
@shared_task(bind=True, max_retries=3)
def send_order_confirmation(self, order_id):
    """Отправить подтверждение заказа по email"""
    try:
        order = Order.objects.get(id=order_id)
        send_mail(
            subject='Order Confirmation',
            message=f'Your order {order.id} is confirmed',
            from_email='noreply@example.com',
            recipient_list=[order.user.email],
        )
        logger.info(f"Confirmation sent for order {order_id}")
    except Order.DoesNotExist:
        logger.error(f"Order {order_id} not found")
    except Exception as exc:
        logger.error(f"Error sending confirmation: {exc}")
        # Retry с exponential backoff
        raise self.retry(exc=exc, countdown=60)

# Задача с priority
@shared_task(bind=True, priority=9)  # High priority
def process_payment(self, payment_id):
    """Обработать платёж (high priority)"""
    try:
        payment = Payment.objects.select_for_update().get(id=payment_id)
        if payment.status == 'pending':
            result = charge_credit_card(payment)
            payment.status = 'completed' if result else 'failed'
            payment.save()
    except Exception as exc:
        raise self.retry(exc=exc, countdown=300, max_retries=5)

# Задача со сложной логикой
@shared_task(bind=True, rate_limit='100/m')  # 100 тасок в минуту максимум
def generate_report(self, report_id):
    """Генерировать отчёт (может быть долгим)"""
    from celery_progress.backend import ProgressRecorder
    
    report = Report.objects.get(id=report_id)
    progress_recorder = ProgressRecorder(self)
    
    try:
        total_items = report.count_items()
        for i, item in enumerate(report.get_items()):
            # Обновляем прогресс
            progress = (i + 1) / total_items * 100
            progress_recorder.set_progress(i + 1, total_items, description='Generating...')
            
            process_item(item, report)
        
        report.status = 'completed'
        report.save()
        logger.info(f"Report {report_id} generated successfully")
    except Exception as exc:
        report.status = 'failed'
        report.error = str(exc)
        report.save()
        raise

# Задача которая вызывает другие задачи
@shared_task
def process_batch_orders(batch_id):
    """Обработать batch заказов (вызывает другие задачи)"""
    orders = Order.objects.filter(batch_id=batch_id)
    
    # Способ 1: Sequential
    for order in orders:
        send_order_confirmation.delay(order.id)
    
    # Способ 2: Parallel
    from celery import group
    job = group([send_order_confirmation.s(order.id) for order in orders])
    job.apply_async()
    
    # Способ 3: С зависимостями (pipeline)
    from celery import chain
    workflow = chain(
        validate_orders.s(batch_id),
        process_payments.s(),
        send_confirmations.s(),
        generate_invoice.s()
    )
    workflow.apply_async()

3. Запуск задач из views

# views.py
from django.views import View
from django.http import JsonResponse
from myapp.tasks import send_order_confirmation, generate_report

class CreateOrderView(View):
    def post(self, request):
        order = Order.objects.create(**request.POST)
        
        # Способ 1: Fire and forget (не ждём результата)
        send_order_confirmation.delay(order.id)
        
        # Способ 2: С таймаутом (задержка)
        send_order_confirmation.apply_async(
            args=(order.id,),
            countdown=300  # Выполнить через 5 минут
        )
        
        # Способ 3: С приоритетом очереди
        send_order_confirmation.apply_async(
            args=(order.id,),
            queue='emails',  # Отправляем в специальную очередь
            priority=10
        )
        
        return JsonResponse({'order_id': order.id})

class GenerateReportView(View):
    def post(self, request):
        report = Report.objects.create()
        
        # Запускаем фоновую задачу и возвращаем task_id
        task = generate_report.apply_async(args=(report.id,))
        
        return JsonResponse({
            'report_id': report.id,
            'task_id': task.id  # Клиент может проверять статус
        })

class CheckProgressView(View):
    def get(self, request, task_id):
        from celery.result import AsyncResult
        
        task_result = AsyncResult(task_id)
        
        return JsonResponse({
            'task_id': task_id,
            'status': task_result.status,
            'result': task_result.result if task_result.successful() else None,
            'progress': task_result.info if task_result.status == 'PROGRESS' else None
        })

4. Обработка ошибок и retries

# tasks.py - правильная обработка ошибок
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import logging
from requests.exceptions import RequestException

logger = logging.getLogger(__name__)

@shared_task(
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 5, 'countdown': 60},
    default_retry_delay=60,
    time_limit=300,
    soft_time_limit=280
)
def call_external_api(self, data):
    """Вызвать внешнее API с автоматическими retries"""
    try:
        response = requests.post('https://api.example.com/process', json=data, timeout=10)
        response.raise_for_status()
        return response.json()
    
    except SoftTimeLimitExceeded:
        # Мягкий лимит времени - есть возможность очиститься
        logger.warning(f"Task {self.request.id} exceeded soft time limit")
        raise
    
    except RequestException as exc:
        # Сетевые ошибки - попробуем позже
        logger.warning(f"API call failed: {exc}, retrying...")
        # Exponential backoff: 60s, 120s, 240s, 480s, 960s
        countdown = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=min(countdown, 3600))
    
    except Exception as exc:
        # Неожиданная ошибка
        logger.error(f"Unexpected error: {exc}", exc_info=True)
        raise

# Задача с custom retry logic
@shared_task
def process_with_smart_retry(task_id, attempt=0):
    """Умные retries на основе состояния"""
    from myapp.models import Task
    import random
    
    try:
        task = Task.objects.get(id=task_id)
        result = do_work(task)
        task.status = 'completed'
        task.save()
        return result
    
    except TemporaryError as exc:
        # Временная ошибка - повторяем с задержкой
        if attempt < 5:
            delay = 60 + random.randint(0, 30)  # Jitter
            process_with_smart_retry.apply_async(
                args=(task_id, attempt + 1),
                countdown=delay
            )
        else:
            task = Task.objects.get(id=task_id)
            task.status = 'failed'
            task.error = str(exc)
            task.save()
            notify_admin(f"Task {task_id} failed after {attempt} attempts")
    
    except PermanentError as exc:
        # Постоянная ошибка - не повторяем
        task = Task.objects.get(id=task_id)
        task.status = 'failed'
        task.error = str(exc)
        task.save()
        logger.error(f"Permanent error in task {task_id}: {exc}")

5. Мониторинг и отладка

# Проверить статус worker
import subprocess

def check_celery_workers():
    # celery -A config inspect active  # Активные задачи
    # celery -A config inspect stats   # Статистика worker'ов
    # celery -A config events          # Real-time события
    pass

# Логирование
from celery.signals import task_prerun, task_postrun, task_failure

@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **extra):
    logger.info(f"Starting task {task.name} [{task_id}]")

@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, result=None, **extra):
    logger.info(f"Completed task {task.name} [{task_id}] in {extra['runtime']:.2f}s")

@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, **extra):
    logger.error(f"Task {sender.name} [{task_id}] failed: {exception}")

# Flower - веб-интерфейс для мониторинга
# celery -A config flower
# Открыть localhost:5555

6. Альтернативы Celery

RQ (Redis Queue) - проще

from django_rq import job
from django_rq import enqueue

@job
def send_email(to_address, subject, message):
    send_mail(subject, message, 'noreply@example.com', [to_address])

# Использование
from django.shortcuts import enqueue
enqueue(send_email, 'user@example.com', 'Hi', 'Hello world')

APScheduler - для расписания

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', minutes=10)
def cleanup_old_files():
    # Выполняется каждые 10 минут
    pass

scheduler.start()

Django-Q - легче чем Celery

from django_q.tasks import async_task

async_task('myapp.tasks.send_email', user_id, 'subject')

7. Production setup

# workers могут быть на разных машинах
celery -A config worker -l info -c 4 -Q default,email,reports
celery -A config worker -l info -c 2 -Q critical
celery -A config beat -l info  # Для периодических задач

# Docker Compose
version: '3.8'

services:
  redis:
    image: redis:7
  
  postgres:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: postgres
  
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    depends_on:
      - postgres
      - redis
  
  celery-worker:
    build: .
    command: celery -A config worker -l info -c 4
    depends_on:
      - postgres
      - redis
  
  celery-beat:
    build: .
    command: celery -A config beat -l info
    depends_on:
      - postgres
      - redis
  
  flower:
    build: .
    command: celery -A config flower
    ports:
      - "5555:5555"
    depends_on:
      - celery-worker
      - redis

8. Best Practices

  1. Идемпотентность: Задача должна дать одинаковый результат даже если выполнится 2 раза
  2. Логирование: Логируй всё - помогает при отладке
  3. Таймауты: Всегда устанавливай time_limit и soft_time_limit
  4. Мониторинг: Используй Flower или Sentry
  5. Dead Letter Queue: Для задач которые не смогли выполниться
  6. Разные очереди: Для разных типов задач (email, reports, critical)
  7. Батчинг: Группируй однотипные задачи вместе
  8. Комплексные задачи: Использи chain, group, chord для сложных workflows

Фоновые задачи - это не опция, а требование для любого production приложения.

Каким образом работал с фоновыми задачами в Django? | PrepBro