← Назад к вопросам

Какие риски выполнения задач знаешь?

1.6 Junior🔥 61 комментариев
#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Риски выполнения задач (асинхронных задач, фоновых работ)

Когда запускаешь задачи асинхронно (Celery, RQ, APScheduler и т.д.), появляется много потенциальных проблем. Разберу основные риски.

1. Дублирование задач

Риск: одна и та же задача запустится несколько раз.

# Сценарий:
# Приложение пытается отправить email
# timeout → retry → отправился дважды (пользователь получит 2 письма)

from celery import Celery, Task

app = Celery('tasks')

@app.task(bind=True, max_retries=3)
def send_email(self, email, subject):
    try:
        # Отправляем email
        send_mail(email, subject)
    except SMTPException as exc:
        # Если ошибка, retry
        self.retry(exc=exc, countdown=60)  # Повторить через 60 сек

# Проблема: если email отправился, но timeout перед return → retry отправит еще раз!

Решение: Idempotent операции

# Делай задачу идемпотентной (можно выполнить дважды без проблем)

@app.task(bind=True, max_retries=3)
def send_email_safe(self, email, subject):
    try:
        # Проверь, не отправили ли уже
        if EmailLog.objects.filter(email=email, subject=subject).exists():
            return  # Уже отправили, пропускаем
        
        send_mail(email, subject)
        
        # Логируем отправку (atomic)
        EmailLog.objects.create(email=email, subject=subject, status='sent')
    except SMTPException as exc:
        self.retry(exc=exc, countdown=60)

2. Потеря задач

Риск: задача попала в очередь, но никогда не выполнилась.

# Сценарий 1: worker упал
celery_worker.py crashed → задачи в очереди потеряны (в памяти)

# Сценарий 2: очередь в памяти (не сохраняется)
from celery import Celery

app = Celery('tasks')
app.conf.update(
    broker_url='memory://'  # ❌ В памяти! При перезагрузке — пропадут
)

# Сценарий 3: задача выполнилась, но не было ack
task_executed = True
# worker упал перед ack → брокер пересылает задачу
# Результат: двойное выполнение!

Решение: Persistent broker

app = Celery('tasks')
app.conf.update(
    broker_url='redis://localhost:6379/0',  # ✅ Redis сохраняет на диск
    result_backend='redis://localhost:6379/1'
)

# Или RabbitMQ с persistent очередями
app.conf.broker_url = 'amqp://guest:guest@localhost//'
app.conf.task_acks_late = True  # Ack после выполнения
app.conf.task_reject_on_worker_lost = True  # Вернуть в очередь, если worker умер

3. Бесконечные retry'и

Риск: задача повторяется бесконечно, забивая очередь.

# ❌ Плохо: будет retry до конца времен
@app.task()
def bad_task():
    try:
        result = external_api.call()  # Всегда падает
    except APIError:
        bad_task.apply_async(countdown=10)  # Бесконечный retry!

# После часа: очередь переполнена, worker не успевает

Решение: Ограничить retry'и и использовать exponential backoff

from celery import Celery
from celery.exceptions import MaxRetriesExceededError

app = Celery('tasks')

@app.task(bind=True, max_retries=5, default_retry_delay=60)
def smart_task(self):
    try:
        result = external_api.call()
    except APIError as exc:
        # Exponential backoff: 2^retry_count * base
        countdown = 2 ** self.request.retries * 60
        
        if self.request.retries >= self.max_retries:
            raise MaxRetriesExceededError(f"Failed after {self.max_retries} attempts")
        
        raise self.retry(exc=exc, countdown=countdown)

# Retry's: 1 min → 2 min → 4 min → 8 min → 16 min → fail

4. Deadlock'и и зависания

Риск: задачи ждут друг друга и блокируют друг друга.

# Сценарий: Circular dependency

@app.task
def task_a():
    result_b = task_b.apply_async().get()  # Ждем task_b
    return f"A+{result_b}"

@app.task
def task_b():
    result_a = task_a.apply_async().get()  # Ждем task_a
    return f"B+{result_a}"

# Результат: DEADLOCK!
# task_a ждет task_b
# task_b ждет task_a
# Обе зависнут навечно (или до timeout)

Решение: Избегай синхронных .get() внутри задач

# ✅ Правильно: асинхронная цепь
from celery import chain, group

# Последовательно
workflow = chain(task_a.s(), task_b.s(), task_c.s())
result = workflow.apply_async()

# Параллельно
workflow = group(task_a.s(), task_b.s(), task_c.s())
result = workflow.apply_async()

# Не делай это (синхронные ожидания):
# result_b = task_b.apply_async().get()  # ❌ Blocking!

5. Race condition'ы в обновлении данных

Риск: две задачи обновляют один объект одновременно.

# Сценарий: инкремент счетчика

@app.task
def increment_counter(user_id):
    user = User.objects.get(id=user_id)
    user.score += 1  # Read → Modify → Write
    user.save()

# Task 1: score = 0 → read → 0+1 → write → 1
# Task 2: score = 0 → read → 0+1 → write → 1
# Result: score = 1 (должно быть 2!)

# Потеря обновления!

Решение: Atomic операции или Lock'и

# ✅ Atomic в БД
from django.db.models import F

@app.task
def increment_counter_safe(user_id):
    User.objects.filter(id=user_id).update(score=F('score') + 1)
    # F выражение вычисляется в БД, гарантированно atomic

# ✅ Или используй lock
from django.db import transaction

@app.task
def increment_counter_locked(user_id):
    with transaction.atomic():
        user = User.objects.select_for_update().get(id=user_id)
        user.score += 1
        user.save()

6. Утечка памяти в долгоживущих worker'ах

Риск: worker постепенно требует больше памяти, пока не упадет.

# Сценарий: задача кэширует данные в памяти

from functools import lru_cache

@lru_cache(maxsize=None)  # ❌ Бесконечный кэш!
 def get_large_file(filename):
    return open(filename).read()  # Каждый уникальный файл = новая запись в кэш

@app.task
def process_files(filenames):
    for f in filenames:
        data = get_large_file(f)  # Кэш растет
        # 1000 уникальных файлов × 100MB = 100GB RAM!

# Worker исчерпывает RAM → OOM kill

Решение: Кэш с лимитом + worker restart

# ✅ Ограничить кэш
@lru_cache(maxsize=128)
def get_large_file(filename):
    return open(filename).read()

# ✅ Перезагружать worker'а периодически
app.conf.worker_max_tasks_per_child = 1000  # Перезагрузка после 1000 задач

# ✅ Явно очищать кэш
@app.task
def process_files(filenames):
    for f in filenames:
        data = get_large_file(f)
        # ...
        get_large_file.cache_clear()  # Очистить между итерациями

7. Отсутствие таймаутов

Риск: задача зависает, worker становится недоступен.

# ❌ Задача без таймаута
@app.task
def download_file(url):
    response = requests.get(url)  # Может зависнуть
    # Если сервер медленный → worker занят навечно

# Worker pools = 16
# 16 зависших задач → все worker'ы заняты
# Новые задачи ждут (или падают)

Решение: Таймауты везде

@app.task(time_limit=300)  # Task убьется через 5 минут
def download_file(url):
    response = requests.get(url, timeout=30)  # HTTP timeout
    # ...

# В конфиге
app.conf.task_soft_time_limit = 300  # Soft limit
app.conf.task_time_limit = 330  # Hard limit (убить через 330 сек)

8. Отсутствие мониторинга

Риск: задачи падают незаметно, никто не узнает.

# ❌ Плохо: задача падает, но никто не знает
@app.task
def critical_task():
    try:
        important_operation()
    except Exception:
        pass  # Скрываем ошибку!

# Задача "успешна" в логах Celery, но ничего не сделала

Решение: Logging + Alerting

import logging
from sentry_sdk import capture_exception

logger = logging.getLogger(__name__)

@app.task(bind=True)
def critical_task(self):
    try:
        important_operation()
        logger.info(f"Task {self.request.id} completed successfully")
    except Exception as e:
        logger.error(f"Task {self.request.id} failed: {e}")
        capture_exception(e)  # Отправить в Sentry
        raise  # Re-raise для retry

# Мониторим queue length
from celery_beat import SchedulingError

@app.task
def monitor_queue():
    queue_size = len(app.control.inspect().active())
    if queue_size > 1000:
        alert("Queue is backing up!")

9. Несогласованность между версиями

Риск: новый код изменил сигнатуру задачи, старые задачи в очереди падают.

# Version 1 (старая):
@app.task
def send_email(email):
    mail.send(email)

# Version 2 (новая):
@app.task
def send_email(email, subject):  # Добавили параметр!
    mail.send(email, subject)

# В очереди: 100 старых задач без subject
# Все падают: missing required argument 'subject'

Решение: Backward compatibility

# ✅ Сделай параметры опциональными
@app.task
def send_email(email, subject=None):
    if subject is None:
        subject = "Default Subject"
    mail.send(email, subject)

# ✅ Или используй json с версионированием
@app.task
def send_email(payload):
    version = payload.get('version', 1)
    if version == 1:
        # Старый формат: {"email": "..."}
        handle_v1(payload)
    elif version == 2:
        # Новый формат: {"email": "...", "subject": "..."}
        handle_v2(payload)

10. Состояние результатов в результат-хранилище

Риск: результаты задач заполняют Redis/DB, растут бесконечно.

# Каждая задача сохраняет результат
result = my_task.delay()
result.get()  # Результат остается в Redis

# После миллиона задач: Redis = 10 GB (дорого!)

Решение: Expire results + cleanup

app.conf.update(
    result_expires=3600,  # Результаты живут 1 час
    result_backend_transport_options={
        'master_name': 'mymaster'
    }
)

# Или явно удаляй
result = my_task.delay()
result.forget()  # Удалить результат из хранилища

Чеклист: Безопасная работа с задачами

  1. ✅ Идемпотентные операции — можно выполнить дважды безопасно
  2. ✅ Persistent broker — Redis/RabbitMQ с сохранением
  3. ✅ Ограненные retry'и — max_retries + exponential backoff
  4. ✅ Таймауты везде — task_time_limit + timeout в операциях
  5. ✅ Избегай синхронных .get() — используй цепи (chain, group)
  6. ✅ Atomic операции — F() выражения или SELECT FOR UPDATE
  7. ✅ Мониторинг — logging + alerting на ошибки
  8. ✅ Dead letter queue — для ошибочных задач
  9. ✅ Версионирование — payload с версией, backward compatibility
  10. ✅ Cleanup результатов — expire results через время
# Пример: безопасная конфигурация
app = Celery('tasks')
app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    result_expires=3600,
    worker_max_tasks_per_child=1000,
    task_soft_time_limit=300,
    task_time_limit=330,
)

Вывод: обработай задачи как critical infrastructure — они могут потеряться, упасть, зависнуть.