Как исполнишь фоновую задачу, если нет Celery?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Фоновые задачи без Celery: практические альтернативы
В современной разработке есть много способов выполнить задачи асинхронно без Celery. Выбор зависит от объёма нагрузки, критичности задачи и инфраструктуры.
1. Threading (многопоточность)
Когда использовать: Задачи с блокирующим I/O (запросы к API, чтение файлов).
import threading
def send_email(user_id, email):
# Отправка письма
print(f"Отправляю письмо {email}")
def create_user():
# В основном потоке возвращаем быстро
user_id = save_to_db()
# Запускаем отправку в фоне
thread = threading.Thread(
target=send_email,
args=(user_id, "user@example.com"),
daemon=True # Поток завершится вместе с приложением
)
thread.start()
return {"user_id": user_id, "status": "created"}
Плюсы: Просто, встроено в Python. Минусы: Потоки общие для всех задач, нет очереди, нет контроля отказоустойчивости.
2. asyncio (асинхронность)
Когда использовать: Асинхронный фреймворк (FastAPI, aiohttp).
import asyncio
from fastapi import FastAPI
app = FastAPI()
async def send_notification(user_id: int):
await asyncio.sleep(1) # I/O операция
print(f"Уведомление отправлено пользователю {user_id}")
@app.post("/users")
async def create_user():
user_id = save_to_db()
# Создаём задачу, но не ждём её
asyncio.create_task(send_notification(user_id))
return {"user_id": user_id}
Плюсы: Эффективен, встроен в FastAPI, работает с await. Минусы: Все задачи теряются если упадёт процесс, нет очереди.
3. APScheduler (планировщик)
Когда использовать: Периодические задачи (очистка БД, отправка рассылок).
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
def cleanup_old_sessions():
print("Удаляю старые сессии...")
# db.sessions.delete_where(created_at < now() - 7 days)
scheduler = BackgroundScheduler()
scheduler.add_job(
cleanup_old_sessions,
cron,
hour=2,
minute=0 # Ежедневно в 2:00
)
scheduler.start()
if __name__ == "__main__":
try:
# Ваше приложение работает
while True:
pass
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
Плюсы: Для периодических задач идеален. Минусы: Не для разовых задач, сложно масштабировать на несколько серверов.
4. RQ (Redis Queue)
Когда использовать: Нужна очередь, но Celery кажется излишним. Redis должен быть.
from redis import Redis
from rq import Queue
import time
redis_conn = Redis()
queue = Queue(connection=redis_conn)
def send_email_task(email: str, subject: str):
time.sleep(2)
print(f"Email отправлен на {email}")
return "success"
# В обработчике запроса
@app.post("/send-email")
def request_email(email: str):
# Добавляем в очередь, ответ сразу
job = queue.enqueue(send_email_task, email, "Hello")
return {"job_id": job.id, "status": "queued"}
# Запуск worker в отдельном процессе
# python -m rq worker
Плюсы: Просто, persistence в Redis, горизонтальное масштабирование. Минусы: Redis обязателен, меньше функций чем Celery.
5. Subprocess (системные процессы)
Когда использовать: Нужно изолировать тяжёлую операцию.
import subprocess
import json
def process_video(video_id: str):
# Запускаем скрипт в отдельном процессе
subprocess.Popen(
["python", "-m", "tasks.video_processor", video_id],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
@app.post("/upload-video")
def upload_video(video_id: str):
save_to_db(video_id)
process_video(video_id) # Не ждём
return {"video_id": video_id, "status": "processing"}
Плюсы: Полная изоляция процесса, свой интерпретатор Python. Минусы: Сложно контролировать, нет встроенного мониторинга.
6. Используем встроенные возможности БД (например, PostgreSQL)
# В функции создания записи
def create_post(user_id: int, content: str):
post = db.posts.insert(user_id, content)
# Используем триггер БД для обновления счётчиков
# Или SELECT уведомления через pg_notify
db.execute(
"SELECT pg_notify(new_post, %s)",
json.dumps({"post_id": post.id, "user_id": user_id})
)
return post
Плюсы: Надёжно, гарантированная доставка. Минусы: Привязано к конкретной БД.
Рекомендации по выбору
| Сценарий | Решение |
|---|---|
| Простые разовые задачи | asyncio.create_task или threading |
| Периодические задачи | APScheduler |
| Очередь с persistence | RQ или переход на Celery |
| I/O в FastAPI | asyncio + create_task |
| Тяжёлые вычисления | multiprocessing или Celery |
Практический пример: FastAPI + asyncio + RQ
from fastapi import FastAPI
from rq import Queue
from redis import Redis
app = FastAPI()
queue = Queue(connection=Redis())
async def light_task():
"""Быстрая задача — в asyncio"""
await asyncio.sleep(0.1)
return "done"
def heavy_task(user_id: int):
"""Тяжёлая задача — в очередь"""
time.sleep(10) # Долгая операция
return f"processed {user_id}"
@app.post("/light")
async def light_endpoint():
result = await light_task()
return {"result": result}
@app.post("/heavy")
async def heavy_endpoint(user_id: int):
job = queue.enqueue(heavy_task, user_id)
return {"job_id": job.id}
Выбирайте инструмент в зависимости от требований вашей системы.