Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Риски выполнения задач (асинхронных задач, фоновых работ)
Когда запускаешь задачи асинхронно (Celery, RQ, APScheduler и т.д.), появляется много потенциальных проблем. Разберу основные риски.
1. Дублирование задач
Риск: одна и та же задача запустится несколько раз.
# Сценарий:
# Приложение пытается отправить email
# timeout → retry → отправился дважды (пользователь получит 2 письма)
from celery import Celery, Task
app = Celery('tasks')
@app.task(bind=True, max_retries=3)
def send_email(self, email, subject):
try:
# Отправляем email
send_mail(email, subject)
except SMTPException as exc:
# Если ошибка, retry
self.retry(exc=exc, countdown=60) # Повторить через 60 сек
# Проблема: если email отправился, но timeout перед return → retry отправит еще раз!
Решение: Idempotent операции
# Делай задачу идемпотентной (можно выполнить дважды без проблем)
@app.task(bind=True, max_retries=3)
def send_email_safe(self, email, subject):
try:
# Проверь, не отправили ли уже
if EmailLog.objects.filter(email=email, subject=subject).exists():
return # Уже отправили, пропускаем
send_mail(email, subject)
# Логируем отправку (atomic)
EmailLog.objects.create(email=email, subject=subject, status='sent')
except SMTPException as exc:
self.retry(exc=exc, countdown=60)
2. Потеря задач
Риск: задача попала в очередь, но никогда не выполнилась.
# Сценарий 1: worker упал
celery_worker.py crashed → задачи в очереди потеряны (в памяти)
# Сценарий 2: очередь в памяти (не сохраняется)
from celery import Celery
app = Celery('tasks')
app.conf.update(
broker_url='memory://' # ❌ В памяти! При перезагрузке — пропадут
)
# Сценарий 3: задача выполнилась, но не было ack
task_executed = True
# worker упал перед ack → брокер пересылает задачу
# Результат: двойное выполнение!
Решение: Persistent broker
app = Celery('tasks')
app.conf.update(
broker_url='redis://localhost:6379/0', # ✅ Redis сохраняет на диск
result_backend='redis://localhost:6379/1'
)
# Или RabbitMQ с persistent очередями
app.conf.broker_url = 'amqp://guest:guest@localhost//'
app.conf.task_acks_late = True # Ack после выполнения
app.conf.task_reject_on_worker_lost = True # Вернуть в очередь, если worker умер
3. Бесконечные retry'и
Риск: задача повторяется бесконечно, забивая очередь.
# ❌ Плохо: будет retry до конца времен
@app.task()
def bad_task():
try:
result = external_api.call() # Всегда падает
except APIError:
bad_task.apply_async(countdown=10) # Бесконечный retry!
# После часа: очередь переполнена, worker не успевает
Решение: Ограничить retry'и и использовать exponential backoff
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
app = Celery('tasks')
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def smart_task(self):
try:
result = external_api.call()
except APIError as exc:
# Exponential backoff: 2^retry_count * base
countdown = 2 ** self.request.retries * 60
if self.request.retries >= self.max_retries:
raise MaxRetriesExceededError(f"Failed after {self.max_retries} attempts")
raise self.retry(exc=exc, countdown=countdown)
# Retry's: 1 min → 2 min → 4 min → 8 min → 16 min → fail
4. Deadlock'и и зависания
Риск: задачи ждут друг друга и блокируют друг друга.
# Сценарий: Circular dependency
@app.task
def task_a():
result_b = task_b.apply_async().get() # Ждем task_b
return f"A+{result_b}"
@app.task
def task_b():
result_a = task_a.apply_async().get() # Ждем task_a
return f"B+{result_a}"
# Результат: DEADLOCK!
# task_a ждет task_b
# task_b ждет task_a
# Обе зависнут навечно (или до timeout)
Решение: Избегай синхронных .get() внутри задач
# ✅ Правильно: асинхронная цепь
from celery import chain, group
# Последовательно
workflow = chain(task_a.s(), task_b.s(), task_c.s())
result = workflow.apply_async()
# Параллельно
workflow = group(task_a.s(), task_b.s(), task_c.s())
result = workflow.apply_async()
# Не делай это (синхронные ожидания):
# result_b = task_b.apply_async().get() # ❌ Blocking!
5. Race condition'ы в обновлении данных
Риск: две задачи обновляют один объект одновременно.
# Сценарий: инкремент счетчика
@app.task
def increment_counter(user_id):
user = User.objects.get(id=user_id)
user.score += 1 # Read → Modify → Write
user.save()
# Task 1: score = 0 → read → 0+1 → write → 1
# Task 2: score = 0 → read → 0+1 → write → 1
# Result: score = 1 (должно быть 2!)
# Потеря обновления!
Решение: Atomic операции или Lock'и
# ✅ Atomic в БД
from django.db.models import F
@app.task
def increment_counter_safe(user_id):
User.objects.filter(id=user_id).update(score=F('score') + 1)
# F выражение вычисляется в БД, гарантированно atomic
# ✅ Или используй lock
from django.db import transaction
@app.task
def increment_counter_locked(user_id):
with transaction.atomic():
user = User.objects.select_for_update().get(id=user_id)
user.score += 1
user.save()
6. Утечка памяти в долгоживущих worker'ах
Риск: worker постепенно требует больше памяти, пока не упадет.
# Сценарий: задача кэширует данные в памяти
from functools import lru_cache
@lru_cache(maxsize=None) # ❌ Бесконечный кэш!
def get_large_file(filename):
return open(filename).read() # Каждый уникальный файл = новая запись в кэш
@app.task
def process_files(filenames):
for f in filenames:
data = get_large_file(f) # Кэш растет
# 1000 уникальных файлов × 100MB = 100GB RAM!
# Worker исчерпывает RAM → OOM kill
Решение: Кэш с лимитом + worker restart
# ✅ Ограничить кэш
@lru_cache(maxsize=128)
def get_large_file(filename):
return open(filename).read()
# ✅ Перезагружать worker'а периодически
app.conf.worker_max_tasks_per_child = 1000 # Перезагрузка после 1000 задач
# ✅ Явно очищать кэш
@app.task
def process_files(filenames):
for f in filenames:
data = get_large_file(f)
# ...
get_large_file.cache_clear() # Очистить между итерациями
7. Отсутствие таймаутов
Риск: задача зависает, worker становится недоступен.
# ❌ Задача без таймаута
@app.task
def download_file(url):
response = requests.get(url) # Может зависнуть
# Если сервер медленный → worker занят навечно
# Worker pools = 16
# 16 зависших задач → все worker'ы заняты
# Новые задачи ждут (или падают)
Решение: Таймауты везде
@app.task(time_limit=300) # Task убьется через 5 минут
def download_file(url):
response = requests.get(url, timeout=30) # HTTP timeout
# ...
# В конфиге
app.conf.task_soft_time_limit = 300 # Soft limit
app.conf.task_time_limit = 330 # Hard limit (убить через 330 сек)
8. Отсутствие мониторинга
Риск: задачи падают незаметно, никто не узнает.
# ❌ Плохо: задача падает, но никто не знает
@app.task
def critical_task():
try:
important_operation()
except Exception:
pass # Скрываем ошибку!
# Задача "успешна" в логах Celery, но ничего не сделала
Решение: Logging + Alerting
import logging
from sentry_sdk import capture_exception
logger = logging.getLogger(__name__)
@app.task(bind=True)
def critical_task(self):
try:
important_operation()
logger.info(f"Task {self.request.id} completed successfully")
except Exception as e:
logger.error(f"Task {self.request.id} failed: {e}")
capture_exception(e) # Отправить в Sentry
raise # Re-raise для retry
# Мониторим queue length
from celery_beat import SchedulingError
@app.task
def monitor_queue():
queue_size = len(app.control.inspect().active())
if queue_size > 1000:
alert("Queue is backing up!")
9. Несогласованность между версиями
Риск: новый код изменил сигнатуру задачи, старые задачи в очереди падают.
# Version 1 (старая):
@app.task
def send_email(email):
mail.send(email)
# Version 2 (новая):
@app.task
def send_email(email, subject): # Добавили параметр!
mail.send(email, subject)
# В очереди: 100 старых задач без subject
# Все падают: missing required argument 'subject'
Решение: Backward compatibility
# ✅ Сделай параметры опциональными
@app.task
def send_email(email, subject=None):
if subject is None:
subject = "Default Subject"
mail.send(email, subject)
# ✅ Или используй json с версионированием
@app.task
def send_email(payload):
version = payload.get('version', 1)
if version == 1:
# Старый формат: {"email": "..."}
handle_v1(payload)
elif version == 2:
# Новый формат: {"email": "...", "subject": "..."}
handle_v2(payload)
10. Состояние результатов в результат-хранилище
Риск: результаты задач заполняют Redis/DB, растут бесконечно.
# Каждая задача сохраняет результат
result = my_task.delay()
result.get() # Результат остается в Redis
# После миллиона задач: Redis = 10 GB (дорого!)
Решение: Expire results + cleanup
app.conf.update(
result_expires=3600, # Результаты живут 1 час
result_backend_transport_options={
'master_name': 'mymaster'
}
)
# Или явно удаляй
result = my_task.delay()
result.forget() # Удалить результат из хранилища
Чеклист: Безопасная работа с задачами
- ✅ Идемпотентные операции — можно выполнить дважды безопасно
- ✅ Persistent broker — Redis/RabbitMQ с сохранением
- ✅ Ограненные retry'и — max_retries + exponential backoff
- ✅ Таймауты везде — task_time_limit + timeout в операциях
- ✅ Избегай синхронных .get() — используй цепи (chain, group)
- ✅ Atomic операции — F() выражения или SELECT FOR UPDATE
- ✅ Мониторинг — logging + alerting на ошибки
- ✅ Dead letter queue — для ошибочных задач
- ✅ Версионирование — payload с версией, backward compatibility
- ✅ Cleanup результатов — expire results через время
# Пример: безопасная конфигурация
app = Celery('tasks')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
task_acks_late=True,
task_reject_on_worker_lost=True,
result_expires=3600,
worker_max_tasks_per_child=1000,
task_soft_time_limit=300,
task_time_limit=330,
)
Вывод: обработай задачи как critical infrastructure — они могут потеряться, упасть, зависнуть.