← Назад к вопросам

Что использовать, чтобы реализовать очередь фоновых задач в Python?

2.0 Middle🔥 81 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Реализация очередей фоновых задач в Python

Существует несколько подходов для создания очередей фоновых задач в Python. Выбор зависит от требований вашего проекта: простота, масштабируемость, надёжность.

1. Celery + RabbitMQ/Redis (Production)

Celery — это самый популярный и мощный инструмент для распределённых асинхронных задач.

from celery import Celery
import time

# Настройка Celery с Redis broker
app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def send_email(email_address):
    time.sleep(2)  # Имитация отправки email
    return f"Email sent to {email_address}"

@app.task(bind=True)
def long_running_task(self):
    for i in range(10):
        self.update_state(state='PROGRESS', meta={'current': i, 'total': 10})
        time.sleep(1)
    return "Task completed"

# Запуск задач из вашего кода
if __name__ == "__main__":
    # Асинхронный запуск
    result = send_email.delay("user@example.com")
    
    # Получение результата
    print(result.get(timeout=30))
    
    # Проверка статуса
    print(result.status)

Преимущества Celery:

  • Масштабируемость — множество worker процессов
  • Надёжность — повторные попытки, dead letter queues
  • Мониторинг — встроенные инструменты отслеживания
  • Гибкость — различные broker'ы (RabbitMQ, Redis, SQS)

Недостатки:

  • Сложность настройки
  • Требует внешних сервисов (Redis/RabbitMQ)
  • Оверхед для простых задач

2. RQ (Redis Queue)

RQ — это более лёгкая альтернатива Celery, также использующая Redis.

from redis import Redis
from rq import Queue
from rq.job import JobStatus
import time

# Определение функции для фона
def process_image(image_url):
    time.sleep(5)  # Обработка изображения
    return f"Processed {image_url}"

# Создание очереди
redis_conn = Redis()
queue = Queue(connection=redis_conn)

# Добавление задачи в очередь
job = queue.enqueue(process_image, "https://example.com/image.jpg")

# Проверка статуса
print(job.get_status())  # queued, started, finished, failed

# Получение результата
if job.is_finished:
    print(job.result)

# Запуск worker процесса (отдельная команда)
# rq worker

Преимущества RQ:

  • Простота использования
  • Меньше boilerplate кода
  • Хорошо подходит для средних приложений

Недостатки:

  • Менее функциональнее чем Celery
  • Зависит от Redis
  • Меньше конфигурации для сложных сценариев

3. APScheduler

APScheduler используется для планирования периодических задач.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import time

scheduler = BackgroundScheduler()

def my_scheduled_task():
    print("Task executed at", time.time())

# Добавление задачи на каждый час
scheduler.add_job(
    my_scheduled_task,
    CronTrigger(hour="*")  # Каждый час
)

# Запуск scheduler в фоне
scheduler.start()

try:
    print("Press Ctrl+C to exit")
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

Использование:

  • Периодические задачи (очистка БД, отправка отчётов)
  • Простые расписания в одном процессе
  • Backup и maintenance операции

4. Встроенная multiprocessing Queue

Для простых случаев можно использовать стандартную библиотеку.

from multiprocessing import Queue, Process
import time

# Функция-worker
def worker(task_queue):
    while True:
        task = task_queue.get()
        if task is None:  # Сигнал завершения
            break
        
        function, args = task
        print(f"Processing: {function.__name__}{args}")
        result = function(*args)
        print(f"Result: {result}")

def slow_function(x):
    time.sleep(2)
    return x ** 2

# Создание очереди и worker процесса
task_queue = Queue()
worker_process = Process(target=worker, args=(task_queue,))
worker_process.start()

# Добавление задач
task_queue.put((slow_function, (5,)))
task_queue.put((slow_function, (10,)))
task_queue.put(None)  # Сигнал завершения

worker_process.join()

Недостатки:

  • Нет межпроцессной коммуникации
  • Нет мониторинга
  • Плохо масштабируется

5. asyncio для конкурентных задач

Если вы работаете с I/O операциями, используйте asyncio.

import asyncio

async def fetch_data(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Имитация сетевого запроса
    print(f"Done fetching {url}")
    return f"Data from {url}"

async def main():
    # Запуск задач конкурентно
    tasks = [
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
        fetch_data("https://api.example.com/3"),
    ]
    
    results = await asyncio.gather(*tasks)
    return results

# Запуск
results = asyncio.run(main())
print(results)

Лучше использовать для:

  • I/O-bound операций (сеть, файлы)
  • Веб-приложений (FastAPI, aiohttp)
  • Конкурентной обработки

6. Comparison: какой выбрать?

# Простая задача (отправка уведомления)
# -> asyncio или встроенная threading

# Средний проект (обработка данных в фоне)
# -> RQ с Redis

# Большой проект (масштабируемость, надёжность)
# -> Celery с RabbitMQ

# Периодические задачи (daily reports)
# -> APScheduler

# Микросервисы (асинхронные операции)
# -> asyncio + aiohttp

# Очень простой прототип
# -> multiprocessing.Queue

Практический пример: комбинированный подход

from celery import Celery
from apscheduler.schedulers.background import BackgroundScheduler
import asyncio

app = Celery('myapp', broker='redis://localhost:6379')
scheduler = BackgroundScheduler()

@app.task
def send_batch_emails(emails):
    return f"Sent {len(emails)} emails"

async def async_data_processing():
    await asyncio.sleep(1)
    return "Data processed"

def periodic_maintenance():
    # Запускается ежечасно
    print("Running maintenance...")
    send_batch_emails.delay(['user1@example.com', 'user2@example.com'])

# Добавление периодической задачи
scheduler.add_job(periodic_maintenance, 'cron', hour='*')
scheduler.start()

if __name__ == "__main__":
    # Запуск Celery worker
    # celery -A myapp worker --loglevel=info
    pass

Рекомендации

  • Начните с простого — используйте asyncio или RQ
  • Масштабируйте при необходимости — переходите на Celery
  • Мониторьте — используйте Flower (веб-интерфейс для Celery)
  • Обрабатывайте ошибки — предусмотрите retry логику
  • Учитывайте порядок — если очередь должна быть в порядке FIFO

Выбор инструмента зависит от сложности вашего проекта и требований к надёжности.