← Назад к вопросам

В чем различия task от shared_task?

1.8 Middle🔥 121 комментариев
#Асинхронность и многопоточность#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# @task vs @shared_task в Celery

Это одна из фундаментальных тем при работе с Celery. Оба декоратора создают асинхронные задачи, но они различаются по области применения и поведению.

@task — основной декоратор Celery

@task привязывает задачу к конкретному экземпляру Celery приложения.

from celery import Celery, Task

app = Celery('myapp', broker='redis://localhost')

# Способ 1: Базовое использование
@app.task
def send_email(to_email: str, subject: str):
    # Это задача привязана к экземпляру 'app'
    import smtplib
    # ...
    return f"Email sent to {to_email}"

# Способ 2: С опциями
@app.task(
    bind=True,  # Получить самого себя как первый аргумент
    name='custom_task_name',
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 3},
    time_limit=300,  # 5 минут максимум
    soft_time_limit=250  # Предупреждение за 50 сек до лимита
)
def process_video(self, video_id: int):
    # self это экземпляр Task
    print(f"Processing video {video_id}")
    
    if self.request.retries > 2:
        # Логирование retry попыток
        print(f"Retry attempt {self.request.retries}")
    
    return {"status": "completed", "video_id": video_id}

# Использование
result = send_email.delay('user@example.com', 'Hello')
print(result.id)  # UUID задачи

@shared_task — для переиспользуемого кода

@shared_task не привязывает задачу к конкретному экземпляру Celery. Это полезно в переиспользуемых приложениях (package'ах).

# mypackage/tasks.py
from celery import shared_task

@shared_task
def send_notification(user_id: int, message: str):
    # Эта задача не привязана к конкретному app экземпляру
    # Она будет работать с любым Celery экземпляром
    user = User.objects.get(id=user_id)
    user.notify(message)
    return f"Notified user {user_id}"

# Эту функцию можно импортировать в разные проекты

Основные различия

1. Привязка к экземпляру

from celery import Celery, shared_task

# Проект 1: myapp/celery.py
app1 = Celery('project1', broker='redis://localhost:6379')

@app1.task
def task_with_app():
    return "Task привязана к app1"

@shared_task
def shared_task_func():
    return "Может работать с любым app"

# Проект 2: другой проект импортирует этот код
from myapp.tasks import shared_task_func, task_with_app

# shared_task_func будет работать с Celery экземпляром проекта 2
# task_with_app будет искать app1 (может не найти)

2. Использование в Django приложении

Остановимся на классическом Django + Celery примере:

# myapp/celery.py (конфигурация Celery)
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()  # Автоматически найдёт все tasks.py

# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User

@shared_task
def send_email_to_user(user_id: int):
    user = User.objects.get(id=user_id)
    send_mail(
        subject='Hello',
        message='Welcome',
        from_email='noreply@example.com',
        recipient_list=[user.email]
    )
    return f"Email sent to {user.email}"

# myapp/views.py
from django.http import JsonResponse
from .tasks import send_email_to_user

def register_user(request):
    user = User.objects.create(...)
    
    # Отправить email в фоне
    send_email_to_user.delay(user.id)
    
    return JsonResponse({"status": "registered"})

3. Bind параметр — доступ к self

from celery import Celery

app = Celery()

# С bind=True получаем доступ к self (Task экземпляр)
@app.task(bind=True, max_retries=3)
def my_task(self, x, y):
    try:
        # Выполняем операцию
        result = x / y
    except ZeroDivisionError as exc:
        # retry с экспоненциальной задержкой
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    
    return result

# Информация о задаче через self.request
@app.task(bind=True)
def logging_task(self):
    print(f"Task ID: {self.request.id}")
    print(f"Retries: {self.request.retries}")
    print(f"Is eager (synchronous): {self.request.is_eager}")
    return "done"

С @shared_task не получится использовать bind=True напрямую:

from celery import shared_task

# Это НЕ работает
@shared_task(bind=True)
def my_shared_task(self):
    # shared_task не может быть bound
    pass

Практические примеры

Пример 1: Использовать @shared_task в переиспользуемом приложении

# notifications/tasks.py
# Это приложение можно переиспользовать в разных проектах

from celery import shared_task
from .models import Notification

@shared_task
def send_notification(user_id: int, message: str):
    notification = Notification.objects.create(
        user_id=user_id,
        message=message
    )
    # Отправляем через WebSocket или email
    return {"notification_id": notification.id}

@shared_task
def cleanup_old_notifications(days: int = 30):
    from datetime import timedelta
    from django.utils import timezone
    
    cutoff = timezone.now() - timedelta(days=days)
    Notification.objects.filter(created_at__lt=cutoff).delete()
    return f"Deleted old notifications"

# Может использоваться в различных проектах

Пример 2: Использовать @app.task с bind=True для retry логики

from celery import Celery
import requests

app = Celery('api_integration', broker='redis://localhost')

@app.task(bind=True, max_retries=3)
def fetch_external_api(self, url: str):
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as exc:
        # Retry с растущей задержкой
        countdown = 2 ** self.request.retries  # 2, 4, 8 сек
        raise self.retry(exc=exc, countdown=countdown)

# Использование
task = fetch_external_api.delay('https://api.example.com/data')
print(task.id)

Пример 3: Комбинирование в Django проекте

# config/celery.py
from celery import Celery

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# users/tasks.py
from celery import shared_task
from app.celery import app
from .models import User

# Переиспользуемая задача
@shared_task
def send_welcome_email(user_id: int):
    user = User.objects.get(id=user_id)
    # Отправляем email
    return f"Email sent to {user.email}"

# Специфичная для этого app задача
@app.task(bind=True, max_retries=5)
def process_bulk_user_data(self, file_path: str):
    try:
        # Обработка файла
        import csv
        with open(file_path) as f:
            reader = csv.DictReader(f)
            for row in reader:
                user = User.objects.create(**row)
                # Отправляем welcome email
                send_welcome_email.delay(user.id)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

# users/views.py
from django.http import JsonResponse
from .tasks import send_welcome_email, process_bulk_user_data

def register(request):
    user = User.objects.create(**request.data)
    send_welcome_email.delay(user.id)  # Асинхронно
    return JsonResponse({"user_id": user.id})

def bulk_import(request):
    file_path = request.FILES['file']
    process_bulk_user_data.delay(file_path)  # В фоне
    return JsonResponse({"status": "importing"})

Сравнительная таблица

┌────────────────────────┬──────────────────────┬──────────────────────┐
│ Критерий               │ @app.task            │ @shared_task         │
├────────────────────────┼──────────────────────┼──────────────────────┤
│ Привязка к app         │ Да, к конкретному    │ Нет, универсально    │
│ Использование bind     │ Да: bind=True        │ Нет, или через wrapper│
│ Переиспользуемость     │ Низкая (привязана)   │ Высокая (независимо) │
│ Retry логика           │ Встроенная с self    │ Через try/except     │
│ Django интеграция      │ В admin              │ В admin + миграции   │
│ Конфигурация           │ В celery.py          │ Автоматическая       │
│ Для package'а          │ Не рекомендуется     │ Рекомендуется        │
└────────────────────────┴──────────────────────┴──────────────────────┘

Когда что использовать

Используйте @app.task если:

  • Задача специфична для вашего проекта
  • Нужна продвинутая retry логика (bind=True)
  • Нужна интеграция с другими компонентами app'а
  • Нужны кастомные опции (time_limit, rate_limit и т.д.)

Используйте @shared_task если:

  • Пишете переиспользуемое Django приложение (package)
  • Задача универсальна и не зависит от конкретного app
  • Собираетесь делиться кодом в разных проектах
  • Простая задача без продвинутой retry логики

Ключевой вывод

  • @task — для app-специфичных задач, полный контроль
  • @shared_task — для переиспользуемого, универсального кода