← Назад к вопросам

Как исполнишь фоновую задачу, если нет Celery?

2.0 Middle🔥 161 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Фоновые задачи без Celery: практические альтернативы

В современной разработке есть много способов выполнить задачи асинхронно без Celery. Выбор зависит от объёма нагрузки, критичности задачи и инфраструктуры.

1. Threading (многопоточность)

Когда использовать: Задачи с блокирующим I/O (запросы к API, чтение файлов).

import threading

def send_email(user_id, email):
    # Отправка письма
    print(f"Отправляю письмо {email}")

def create_user():
    # В основном потоке возвращаем быстро
    user_id = save_to_db()
    
    # Запускаем отправку в фоне
    thread = threading.Thread(
        target=send_email,
        args=(user_id, "user@example.com"),
        daemon=True  # Поток завершится вместе с приложением
    )
    thread.start()
    
    return {"user_id": user_id, "status": "created"}

Плюсы: Просто, встроено в Python. Минусы: Потоки общие для всех задач, нет очереди, нет контроля отказоустойчивости.

2. asyncio (асинхронность)

Когда использовать: Асинхронный фреймворк (FastAPI, aiohttp).

import asyncio
from fastapi import FastAPI

app = FastAPI()

async def send_notification(user_id: int):
    await asyncio.sleep(1)  # I/O операция
    print(f"Уведомление отправлено пользователю {user_id}")

@app.post("/users")
async def create_user():
    user_id = save_to_db()
    
    # Создаём задачу, но не ждём её
    asyncio.create_task(send_notification(user_id))
    
    return {"user_id": user_id}

Плюсы: Эффективен, встроен в FastAPI, работает с await. Минусы: Все задачи теряются если упадёт процесс, нет очереди.

3. APScheduler (планировщик)

Когда использовать: Периодические задачи (очистка БД, отправка рассылок).

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

def cleanup_old_sessions():
    print("Удаляю старые сессии...")
    # db.sessions.delete_where(created_at < now() - 7 days)

scheduler = BackgroundScheduler()
scheduler.add_job(
    cleanup_old_sessions,
    cron,
    hour=2,
    minute=0  # Ежедневно в 2:00
)
scheduler.start()

if __name__ == "__main__":
    try:
        # Ваше приложение работает
        while True:
            pass
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

Плюсы: Для периодических задач идеален. Минусы: Не для разовых задач, сложно масштабировать на несколько серверов.

4. RQ (Redis Queue)

Когда использовать: Нужна очередь, но Celery кажется излишним. Redis должен быть.

from redis import Redis
from rq import Queue
import time

redis_conn = Redis()
queue = Queue(connection=redis_conn)

def send_email_task(email: str, subject: str):
    time.sleep(2)
    print(f"Email отправлен на {email}")
    return "success"

# В обработчике запроса
@app.post("/send-email")
def request_email(email: str):
    # Добавляем в очередь, ответ сразу
    job = queue.enqueue(send_email_task, email, "Hello")
    return {"job_id": job.id, "status": "queued"}

# Запуск worker в отдельном процессе
# python -m rq worker

Плюсы: Просто, persistence в Redis, горизонтальное масштабирование. Минусы: Redis обязателен, меньше функций чем Celery.

5. Subprocess (системные процессы)

Когда использовать: Нужно изолировать тяжёлую операцию.

import subprocess
import json

def process_video(video_id: str):
    # Запускаем скрипт в отдельном процессе
    subprocess.Popen(
        ["python", "-m", "tasks.video_processor", video_id],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )

@app.post("/upload-video")
def upload_video(video_id: str):
    save_to_db(video_id)
    process_video(video_id)  # Не ждём
    return {"video_id": video_id, "status": "processing"}

Плюсы: Полная изоляция процесса, свой интерпретатор Python. Минусы: Сложно контролировать, нет встроенного мониторинга.

6. Используем встроенные возможности БД (например, PostgreSQL)

# В функции создания записи
def create_post(user_id: int, content: str):
    post = db.posts.insert(user_id, content)
    
    # Используем триггер БД для обновления счётчиков
    # Или SELECT уведомления через pg_notify
    db.execute(
        "SELECT pg_notify(new_post, %s)",
        json.dumps({"post_id": post.id, "user_id": user_id})
    )
    return post

Плюсы: Надёжно, гарантированная доставка. Минусы: Привязано к конкретной БД.

Рекомендации по выбору

СценарийРешение
Простые разовые задачиasyncio.create_task или threading
Периодические задачиAPScheduler
Очередь с persistenceRQ или переход на Celery
I/O в FastAPIasyncio + create_task
Тяжёлые вычисленияmultiprocessing или Celery

Практический пример: FastAPI + asyncio + RQ

from fastapi import FastAPI
from rq import Queue
from redis import Redis

app = FastAPI()
queue = Queue(connection=Redis())

async def light_task():
    """Быстрая задача — в asyncio"""
    await asyncio.sleep(0.1)
    return "done"

def heavy_task(user_id: int):
    """Тяжёлая задача — в очередь"""
    time.sleep(10)  # Долгая операция
    return f"processed {user_id}"

@app.post("/light")
async def light_endpoint():
    result = await light_task()
    return {"result": result}

@app.post("/heavy")
async def heavy_endpoint(user_id: int):
    job = queue.enqueue(heavy_task, user_id)
    return {"job_id": job.id}

Выбирайте инструмент в зависимости от требований вашей системы.