Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как я организую фоновые задачи
Для фоновых задач выбираю инструмент в зависимости от требований проекта и масштаба.
Celery — для сложных систем
В production-проектах с миллионами задач использую Celery с Redis/RabbitMQ. Это позволяет:
- Обрабатывать миллионы задач асинхронно
- Распределять нагрузку на много воркеров
- Переиспользовать результаты через кеш
- Ретраить падающие задачи с экспоненциальной задержкой
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def send_email_task(self, email, subject):
try:
# логика отправки
time.sleep(1)
return f'Email sent to {email}'
except Exception as exc:
# ретрай с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
APScheduler — для периодических задач
Для регулярных задач (кроны) предпочитаю APScheduler вместо cron, так как всё в коде и легче масштабировать:
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
scheduler = BackgroundScheduler()
@scheduler.scheduled_job('cron', hour=0, minute=0)
def cleanup_old_files():
# очистка старых файлов в полночь
pass
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
asyncio — для простых случаев
В современных проектах на FastAPI часто использую asyncio с фоновыми задачами:
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def log_request(request_id: str):
await asyncio.sleep(5)
print(f'Request {request_id} completed')
@app.post('/process')
async def process(background_tasks: BackgroundTasks):
background_tasks.add_task(log_request, 'req-123')
return {'status': 'processing'}
APScheduler + FastAPI для веб-приложений
Ача для веб-приложений комбинирую APScheduler с FastAPI и database задачами:
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
app = FastAPI()
scheduler = AsyncIOScheduler()
@app.on_event('startup')
async def startup():
scheduler.start()
@scheduler.scheduled_job('interval', minutes=30)
async def sync_data_from_api():
# запрос данных и сохранение в БД
pass
Ключевые практики
- Идемпотентность: фоновая задача должна безопасно повторяться
- Обработка ошибок: логирование, ретраи, dead letter queues
- Мониторинг: алерты на падение воркеров
- Сокращение времени: кешировать часто вычисляемые данные
Выбираю в зависимости от:
- Celery: миллионы задач, сложная логика ретраев
- APScheduler: периодические задачи, небольшой масштаб
- asyncio: простые задачи в FastAPI
- Threading: legacy коды, простые операции