← Назад к вопросам

Как в Django выполнять background job?

1.7 Middle🔥 241 комментариев
#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Background Jobs в Django

Выполнение асинхронных задач в Django — важный аспект масштабируемых приложений. За свой опыт я работал с несколькими решениями и каждое имеет свои преимущества и недостатки.

Основные подходы

Для background jobs в Django существует несколько основных решений:

  1. Celery — самое популярное решение с поддержкой различных message brokers
  2. Django-RQ — упрощённая альтернатива на Redis
  3. Huey — лёгкий task queue
  4. Django Signals — встроенный механизм, но с ограничениями
  5. Django Async — встроенная поддержка асинхронного кода (3.1+)
  6. APScheduler — для scheduled tasks

Celery — самое мощное решение

Целери является industry standard и вот как я его обычно использую:

# settings.py
import os
from kombu import Exchange, Queue

CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# Определение очередей
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('priority'), routing_key='priority'),
    Queue('low_priority', Exchange('low'), routing_key='low'),
)
# celery.py (в корне проекта)
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

app.conf.beat_schedule = {
    'send-report-every-hour': {
        'task': 'myapp.tasks.send_report',
        'schedule': crontab(minute=0),
    },
}
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int) -> str:
    """Отправить приветственное письмо пользователю."""
    try:
        user = User.objects.get(id=user_id)
        send_mail(
            subject='Welcome!',
            message=f'Hello, {user.name}',
            from_email='noreply@example.com',
            recipient_list=[user.email],
        )
        return f'Email sent to {user.email}'
    except User.DoesNotExist:
        return 'User not found'
    except Exception as exc:
        # Retry с exponential backoff
        raise self.retry(exc=exc, countdown=60)

# Вызов из представления
from .tasks import send_welcome_email

def create_user(request):
    user = User.objects.create(...)
    # Запустить асинхронно
    send_welcome_email.delay(user.id)
    return Response({'status': 'User created'})

Цепочки задач (Task Chains)

Целери позволяет создавать сложные сценарии с несколькими задачами:

from celery import chain, group, chord
from .tasks import process_data, send_notification, generate_report

# Цепь: одна задача за другой
workflow = chain(
    process_data.s(data_id),
    generate_report.s(),
    send_notification.s(user_id),
)
result = workflow.apply_async()

# Параллельное выполнение (group)
parallel_tasks = group(
    process_data.s(id1),
    process_data.s(id2),
    process_data.s(id3),
)
result = parallel_tasks.apply_async()

# Chord: параллельные задачи потом в финальную
project = chord([
    process_user.s(user_id) for user_id in users
])(generate_summary.s())

Django-RQ — более простое решение

Для небольших проектов я часто использую Django-RQ:

# settings.py
RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    },
    'high': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    },
}

# views.py
from django_rq import job

@job
def send_email(email, subject, message):
    """Функция с декоратором для background выполнения."""
    # Отправить письмо
    pass

# Использование
from django_rq import enqueue

def subscribe(request):
    email = request.POST.get('email')
    enqueue(send_email, email, 'Welcome', 'Hello!')
    return Response({'status': 'Subscribed'})

Django Signals — встроенное решение

Для простых случаев можно использовать сигналы, но они выполняются синхронно:

from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import User
from .tasks import send_welcome_email

@receiver(post_save, sender=User)
def on_user_created(sender, instance, created, **kwargs):
    """Сигнал при создании пользователя."""
    if created:
        # Отправить письмо асинхронно
        send_welcome_email.delay(instance.id)

# apps.py
from django.apps import AppConfig

class MyAppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'myapp'
    
    def ready(self):
        import myapp.signals  # Подключить сигналы

Асинхронные функции в Django 3.1+

Джанго встроенно поддерживает асинхронный код:

import asyncio
from django.views.decorators.http import require_http_methods

async def fetch_data_async(url):
    """Асинхронная функция (требует async runtime)."""
    # Использовать aiohttp или httpx
    pass

async def process_request_async(request):
    """Асинхронное представление."""
    result = await fetch_data_async('https://api.example.com')
    return JsonResponse({'result': result})

APScheduler для периодических задач

Для scheduled tasks я использую APScheduler:

from apscheduler.schedulers.background import BackgroundScheduler
from django.apps import AppConfig

def cleanup_old_files():
    """Очистить старые файлы."""
    pass

class MyAppConfig(AppConfig):
    def ready(self):
        scheduler = BackgroundScheduler()
        scheduler.add_job(
            cleanup_old_files,
            'cron',
            hour=2,  # Каждый день в 2 часа ночи
            id='cleanup_job',
        )
        if not scheduler.running:
            scheduler.start()

Сравнение решений

РешениеСложностьПроизводительностьМасштабируемостьЛучший случай
CeleryВысокаяОтличнаяОтличнаяБольшие проекты
Django-RQСредняяХорошаяХорошаяСредние проекты
HueyНизкаяХорошаяПриемлемаяНебольшие проекты
Django SignalsОчень низкаяСинхронноПлохаяПростые case'ы
Django AsyncНизкаяХорошаяСредняяAsync приложения

Лучшие практики

  1. Используй retry логику — задачи могут упасть, нужна обработка ошибок
  2. Идемпотентные задачи — одна и та же задача может выполниться несколько раз
  3. Мониторинг и логирование — отслеживай выполнение задач
  4. Timeout для задач — предотвращай зависание
  5. Разделение очередей — разные приоритеты на разных очередях
  6. Testing — мокируй задачи в тестах
# Тестирование с Celery
from django.test import TestCase
from celery import current_app

class TaskTests(TestCase):
    def setUp(self):
        # Отключить async для тестов
        current_app.conf.task_always_eager = True
        current_app.conf.task_eager_propagates = True
    
    def test_send_email_task(self):
        from .tasks import send_welcome_email
        result = send_welcome_email.delay(1)
        self.assertEqual(result.status, 'SUCCESS')

Заключение

Для большинства Django проектов Celery + Redis — оптимальный выбор. Но для небольших приложений Django-RQ или встроенные решения могут быть достаточными. Главное — выбрать решение, которое масштабируется с растущим проектом.