← Назад к вопросам

Какие технологии использовал при работе с задачами с очередями?

1.2 Junior🔥 151 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Технологии для работы с очередями задач (Task Queues)

Что такое очередь задач?

Очередь задач — это система, которая позволяет:

  1. Отправить асинхронную задачу в очередь
  2. Обработать её в фоне без блокирования основного приложения
  3. Масштабировать обработку на несколько worker'ов

Примеры использования:

  • Отправка писем (долго)
  • Генерация отчётов (вычислительно тяжело)
  • Обработка видео (долго)
  • Аналитика и логирование
  • WebScraping и парсинг

1. Celery (самая популярная технология)

Celery — распределённая очередь задач, написанная на Python. Работает с различными message brokers.

# Установка
# pip install celery redis

from celery import Celery
import time

# Инициализация
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')

# Определение задачи
@app.task
def send_email(email: str, subject: str, body: str):
    """Отправить email асинхронно"""
    print(f'Отправляю письмо на {email}')
    time.sleep(2)  # Имитация отправки
    return f'Email отправлен на {email}'

@app.task(bind=True)  # bind=True даёт доступ к self (контексту задачи)
def long_task(self, duration: int):
    """Долгая задача с отслеживанием прогресса"""
    print(f'Начало задачи, длительность: {duration} сек')
    for i in range(duration):
        time.sleep(1)
        # Обновить прогресс
        self.update_state(
            state='PROGRESS',
            meta={'current': i, 'total': duration}
        )
    return f'Задача завершена за {duration} сек'

# Запуск задачи (асинхронно)
if __name__ == '__main__':
    # Отправить в очередь
    result = send_email.delay('user@example.com', 'Hello', 'Welcome!')
    print(f'Task ID: {result.id}')  # Вернёт ID задачи сразу
    
    # Проверить статус
    print(result.status)  # PENDING -> STARTED -> SUCCESS
    print(result.get(timeout=30))  # Получить результат
    
    # Запустить задачу с опциями
    result = send_email.apply_async(
        args=('user@example.com', 'Test', 'Message'),
        countdown=10,  # Запустить через 10 сек
        expires=3600,  # Задача истечёт через 1 час
        retry=3,  # Переоправить 3 раза при ошибке
    )
    
    # Повторяющиеся задачи (Celery Beat)
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'send-reports-every-morning': {
            'task': 'tasks.send_report',
            'schedule': crontab(hour=8, minute=0),  # 08:00 каждый день
        },
    }

Запуск worker'ов:

# Worker 1
celery -A tasks worker --loglevel=info

# Worker 2 (масштабирование)
celery -A tasks worker --loglevel=info

# Celery Beat (планировщик повторяющихся задач)
celery -A tasks beat --loglevel=info

# Flower (веб-интерфейс для мониторинга)
flower --port=5555

Плюсы Celery:

  • ✅ Очень гибкая и мощная
  • ✅ Работает с Redis, RabbitMQ, Amazon SQS
  • ✅ Встроенная поддержка повторных попыток и отката
  • ✅ Планирование задач (Celery Beat)
  • ✅ Отслеживание прогресса
  • ✅ Мониторинг через Flower

Минусы Celery:

  • ❌ Сложная конфигурация
  • ❌ Требует внешнего message broker (Redis/RabbitMQ)
  • ❌ Документация запутанная
  • ❌ Может быть overkill для простых задач

2. RQ (Redis Queue) — более простая альтернатива

# pip install rq

from redis import Redis
from rq import Queue
import time

# Подключение к Redis
redis_conn = Redis()
queue = Queue(connection=redis_conn)

# Определение функций (обычные функции, не задачи)
def send_email(email: str):
    print(f'Отправляю письмо на {email}')
    time.sleep(2)
    return f'Email отправлен'

def generate_report(report_id: int):
    print(f'Генерирую отчёт {report_id}')
    time.sleep(5)
    return {'report_id': report_id, 'status': 'done'}

# Отправить в очередь
if __name__ == '__main__':
    # Добавить в очередь
    job = queue.enqueue(send_email, 'user@example.com')
    print(f'Job ID: {job.id}')
    print(f'Job Status: {job.get_status()}')  # queued, started, finished, failed
    
    # Получить результат
    result = job.result  # Блокирует до выполнения
    
    # Запланировать на конкретное время
    from datetime import datetime, timedelta
    job = queue.enqueue_at(
        datetime.now() + timedelta(minutes=5),
        send_email,
        'user@example.com'
    )
    
    # Запустить с timeout
    job = queue.enqueue(generate_report, 1, job_timeout=30)  # 30 сек максимум

Запуск worker'ов:

# Worker
rq worker

# Multiple workers
rq worker high default low  # Разные очереди с приоритетами

# Мониторинг
rq-dashboard

Плюсы RQ:

  • ✅ Очень простая и понятная
  • ✅ Меньше конфигурации
  • ✅ Встроенная поддержка приоритетов
  • ✅ Хороший вебинтерфейс (rq-dashboard)

Минусы RQ:

  • ❌ Только Redis
  • ❌ Меньше функций чем Celery
  • ❌ Нет встроенного планирования (нужна отдельная библиотека)
  • ❌ Менее гибкая обработка ошибок

3. APScheduler — планирование задач

# pip install apscheduler

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
import time

def my_task(param):
    print(f'Выполняю задачу с параметром: {param}')

# Создать планировщик
scheduler = BackgroundScheduler()

# Запланировать на определённое время
scheduler.add_job(
    my_task,
    trigger='interval',
    seconds=30,  # Повторять каждые 30 сек
    args=(42,),
    id='my_job'
)

# Cron-style расписание
scheduler.add_job(
    my_task,
    trigger=CronTrigger(hour=8, minute=0, day_of_week='mon-fri'),  # Будни в 8:00
    args=(10,)
)

# Одноразовая задача
scheduler.add_job(
    my_task,
    trigger='date',
    run_date=datetime.now() + timedelta(seconds=5),
    args=(5,)
)

scheduler.start()

# Использование в Flask
from flask import Flask

app = Flask(__name__)
scheduler = BackgroundScheduler()

@app.before_first_request
def init():
    scheduler.add_job(
        lambda: print('Очистка БД'),
        trigger='cron',
        hour=2, minute=0
    )
    scheduler.start()

@app.route('/schedule')
def schedule_task():
    scheduler.add_job(my_task, 'interval', seconds=60, args=(100,))
    return 'Задача запланирована'

Плюсы APScheduler:

  • ✅ Встроена прямо в приложение
  • ✅ Поддерживает различные триггеры
  • ✅ Простая интеграция с Flask/Django

Минусы APScheduler:

  • ❌ Не масштабируется на несколько машин
  • ❌ При перезагрузке приложения задачи теряются
  • ❌ Нет простого мониторинга

4. Message Brokers для очередей

Redis

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Простая очередь
r.rpush('task_queue', 'task1', 'task2', 'task3')  # Добавить в очередь
task = r.lpop('task_queue')  # Получить первую

# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
    print(message['data'])

# Worker
def worker():
    while True:
        task = r.blpop('task_queue', timeout=0)  # Блокирующий pop
        if task:
            process_task(task[1])

Плюсы Redis:

  • ✅ Очень быстрый
  • ✅ Встроенная поддержка Pub/Sub
  • ✅ Простой и лёгкий

Минусы Redis:

  • ❌ В памяти — данные теряются при перезагрузке
  • ❌ Не очень надёжен для критических задач

RabbitMQ

# pip install pika

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()

# Объявить очередь
channel.queue_declare(queue='task_queue', durable=True)

# Отправить сообщение
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='task_data',
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
    )
)

# Worker
def callback(ch, method, properties, body):
    print(f'Получена задача: {body}')
    # Обработка
    ch.basic_ack(delivery_tag=method.delivery_tag)  # Подтвердить

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # Обработка по одной задаче
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback
)

print('Слушаю очередь...')
channel.start_consuming()

Плюсы RabbitMQ:

  • ✅ Очень надёжный
  • ✅ Гарантирует доставку сообщений
  • ✅ Поддерживает сложные маршруты (Exchange, Routing Keys)
  • ✅ Долговечный (дисковое хранилище)

Минусы RabbitMQ:

  • ❌ Сложнее в настройке чем Redis
  • ❌ Медленнее чем Redis
  • ❌ Требует больше ресурсов

Kafka

# pip install kafka-python

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Отправить сообщение
producer.send('tasks', {'task': 'send_email', 'user': 'john'})

# Consumer
consumer = KafkaConsumer(
    'tasks',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'  # С начала если нет offset
)

for message in consumer:
    print(f'Получена задача: {message.value}')
    # Обработка

Плюсы Kafka:

  • ✅ Масштабируется на петабайты данных
  • ✅ Поддерживает очень высокую пропускную способность
  • ✅ Долговечное хранилище
  • ✅ Потоковая обработка

Минусы Kafka:

  • ❌ Сложнее в настройке
  • ❌ Больше overhead чем Redis/RabbitMQ
  • ❌ Требует больше ресурсов
  • ❌ Для простых задач — overkill

Сравнение технологий

ТехнологияСложностьПроизводительностьНадёжностьМасштабируемостьКогда использовать
RQОчень простаяВысокаяСредняяОграниченнаяПростые приложения
CeleryСложнаяОчень высокаяВысокаяОтличнаяProduction приложения
APSchedulerПростаяСредняяНизкаяОчень ограниченнаяВстроенное планирование
Redis QueueПростаяВысокаяСредняяХорошаяI/O операции
RabbitMQСредняяХорошаяОчень высокаяОтличнаяКритичные задачи
KafkaСложнаяОчень высокаяОчень высокаяИсключительнаяBig Data, потоки

Практические рекомендации

Начни с простого:

  1. Малое приложение? → RQ + Redis
  2. Среднее приложение? → Celery + Redis
  3. Критичные задачи? → Celery + RabbitMQ
  4. Big Data потоки? → Kafka

Лучшие практики:

  • Используй идемпотентные задачи (выполнение дважды = выполнение один раз)
  • Обработай исключения и добавь логирование
  • Устанавливай timeout на задачи
  • Мониторь очередь и обработку
  • Используй DLQ (Dead Letter Queue) для неудачных задач
Какие технологии использовал при работе с задачами с очередями? | PrepBro