← Назад к вопросам
Какие технологии использовал при работе с задачами с очередями?
1.2 Junior🔥 151 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Технологии для работы с очередями задач (Task Queues)
Что такое очередь задач?
Очередь задач — это система, которая позволяет:
- Отправить асинхронную задачу в очередь
- Обработать её в фоне без блокирования основного приложения
- Масштабировать обработку на несколько worker'ов
Примеры использования:
- Отправка писем (долго)
- Генерация отчётов (вычислительно тяжело)
- Обработка видео (долго)
- Аналитика и логирование
- WebScraping и парсинг
1. Celery (самая популярная технология)
Celery — распределённая очередь задач, написанная на Python. Работает с различными message brokers.
# Установка
# pip install celery redis
from celery import Celery
import time
# Инициализация
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
# Определение задачи
@app.task
def send_email(email: str, subject: str, body: str):
"""Отправить email асинхронно"""
print(f'Отправляю письмо на {email}')
time.sleep(2) # Имитация отправки
return f'Email отправлен на {email}'
@app.task(bind=True) # bind=True даёт доступ к self (контексту задачи)
def long_task(self, duration: int):
"""Долгая задача с отслеживанием прогресса"""
print(f'Начало задачи, длительность: {duration} сек')
for i in range(duration):
time.sleep(1)
# Обновить прогресс
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': duration}
)
return f'Задача завершена за {duration} сек'
# Запуск задачи (асинхронно)
if __name__ == '__main__':
# Отправить в очередь
result = send_email.delay('user@example.com', 'Hello', 'Welcome!')
print(f'Task ID: {result.id}') # Вернёт ID задачи сразу
# Проверить статус
print(result.status) # PENDING -> STARTED -> SUCCESS
print(result.get(timeout=30)) # Получить результат
# Запустить задачу с опциями
result = send_email.apply_async(
args=('user@example.com', 'Test', 'Message'),
countdown=10, # Запустить через 10 сек
expires=3600, # Задача истечёт через 1 час
retry=3, # Переоправить 3 раза при ошибке
)
# Повторяющиеся задачи (Celery Beat)
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-reports-every-morning': {
'task': 'tasks.send_report',
'schedule': crontab(hour=8, minute=0), # 08:00 каждый день
},
}
Запуск worker'ов:
# Worker 1
celery -A tasks worker --loglevel=info
# Worker 2 (масштабирование)
celery -A tasks worker --loglevel=info
# Celery Beat (планировщик повторяющихся задач)
celery -A tasks beat --loglevel=info
# Flower (веб-интерфейс для мониторинга)
flower --port=5555
Плюсы Celery:
- ✅ Очень гибкая и мощная
- ✅ Работает с Redis, RabbitMQ, Amazon SQS
- ✅ Встроенная поддержка повторных попыток и отката
- ✅ Планирование задач (Celery Beat)
- ✅ Отслеживание прогресса
- ✅ Мониторинг через Flower
Минусы Celery:
- ❌ Сложная конфигурация
- ❌ Требует внешнего message broker (Redis/RabbitMQ)
- ❌ Документация запутанная
- ❌ Может быть overkill для простых задач
2. RQ (Redis Queue) — более простая альтернатива
# pip install rq
from redis import Redis
from rq import Queue
import time
# Подключение к Redis
redis_conn = Redis()
queue = Queue(connection=redis_conn)
# Определение функций (обычные функции, не задачи)
def send_email(email: str):
print(f'Отправляю письмо на {email}')
time.sleep(2)
return f'Email отправлен'
def generate_report(report_id: int):
print(f'Генерирую отчёт {report_id}')
time.sleep(5)
return {'report_id': report_id, 'status': 'done'}
# Отправить в очередь
if __name__ == '__main__':
# Добавить в очередь
job = queue.enqueue(send_email, 'user@example.com')
print(f'Job ID: {job.id}')
print(f'Job Status: {job.get_status()}') # queued, started, finished, failed
# Получить результат
result = job.result # Блокирует до выполнения
# Запланировать на конкретное время
from datetime import datetime, timedelta
job = queue.enqueue_at(
datetime.now() + timedelta(minutes=5),
send_email,
'user@example.com'
)
# Запустить с timeout
job = queue.enqueue(generate_report, 1, job_timeout=30) # 30 сек максимум
Запуск worker'ов:
# Worker
rq worker
# Multiple workers
rq worker high default low # Разные очереди с приоритетами
# Мониторинг
rq-dashboard
Плюсы RQ:
- ✅ Очень простая и понятная
- ✅ Меньше конфигурации
- ✅ Встроенная поддержка приоритетов
- ✅ Хороший вебинтерфейс (rq-dashboard)
Минусы RQ:
- ❌ Только Redis
- ❌ Меньше функций чем Celery
- ❌ Нет встроенного планирования (нужна отдельная библиотека)
- ❌ Менее гибкая обработка ошибок
3. APScheduler — планирование задач
# pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
import time
def my_task(param):
print(f'Выполняю задачу с параметром: {param}')
# Создать планировщик
scheduler = BackgroundScheduler()
# Запланировать на определённое время
scheduler.add_job(
my_task,
trigger='interval',
seconds=30, # Повторять каждые 30 сек
args=(42,),
id='my_job'
)
# Cron-style расписание
scheduler.add_job(
my_task,
trigger=CronTrigger(hour=8, minute=0, day_of_week='mon-fri'), # Будни в 8:00
args=(10,)
)
# Одноразовая задача
scheduler.add_job(
my_task,
trigger='date',
run_date=datetime.now() + timedelta(seconds=5),
args=(5,)
)
scheduler.start()
# Использование в Flask
from flask import Flask
app = Flask(__name__)
scheduler = BackgroundScheduler()
@app.before_first_request
def init():
scheduler.add_job(
lambda: print('Очистка БД'),
trigger='cron',
hour=2, minute=0
)
scheduler.start()
@app.route('/schedule')
def schedule_task():
scheduler.add_job(my_task, 'interval', seconds=60, args=(100,))
return 'Задача запланирована'
Плюсы APScheduler:
- ✅ Встроена прямо в приложение
- ✅ Поддерживает различные триггеры
- ✅ Простая интеграция с Flask/Django
Минусы APScheduler:
- ❌ Не масштабируется на несколько машин
- ❌ При перезагрузке приложения задачи теряются
- ❌ Нет простого мониторинга
4. Message Brokers для очередей
Redis
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Простая очередь
r.rpush('task_queue', 'task1', 'task2', 'task3') # Добавить в очередь
task = r.lpop('task_queue') # Получить первую
# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
print(message['data'])
# Worker
def worker():
while True:
task = r.blpop('task_queue', timeout=0) # Блокирующий pop
if task:
process_task(task[1])
Плюсы Redis:
- ✅ Очень быстрый
- ✅ Встроенная поддержка Pub/Sub
- ✅ Простой и лёгкий
Минусы Redis:
- ❌ В памяти — данные теряются при перезагрузке
- ❌ Не очень надёжен для критических задач
RabbitMQ
# pip install pika
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Объявить очередь
channel.queue_declare(queue='task_queue', durable=True)
# Отправить сообщение
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='task_data',
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
# Worker
def callback(ch, method, properties, body):
print(f'Получена задача: {body}')
# Обработка
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтвердить
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1) # Обработка по одной задаче
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
print('Слушаю очередь...')
channel.start_consuming()
Плюсы RabbitMQ:
- ✅ Очень надёжный
- ✅ Гарантирует доставку сообщений
- ✅ Поддерживает сложные маршруты (Exchange, Routing Keys)
- ✅ Долговечный (дисковое хранилище)
Минусы RabbitMQ:
- ❌ Сложнее в настройке чем Redis
- ❌ Медленнее чем Redis
- ❌ Требует больше ресурсов
Kafka
# pip install kafka-python
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Отправить сообщение
producer.send('tasks', {'task': 'send_email', 'user': 'john'})
# Consumer
consumer = KafkaConsumer(
'tasks',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest' # С начала если нет offset
)
for message in consumer:
print(f'Получена задача: {message.value}')
# Обработка
Плюсы Kafka:
- ✅ Масштабируется на петабайты данных
- ✅ Поддерживает очень высокую пропускную способность
- ✅ Долговечное хранилище
- ✅ Потоковая обработка
Минусы Kafka:
- ❌ Сложнее в настройке
- ❌ Больше overhead чем Redis/RabbitMQ
- ❌ Требует больше ресурсов
- ❌ Для простых задач — overkill
Сравнение технологий
| Технология | Сложность | Производительность | Надёжность | Масштабируемость | Когда использовать |
|---|---|---|---|---|---|
| RQ | Очень простая | Высокая | Средняя | Ограниченная | Простые приложения |
| Celery | Сложная | Очень высокая | Высокая | Отличная | Production приложения |
| APScheduler | Простая | Средняя | Низкая | Очень ограниченная | Встроенное планирование |
| Redis Queue | Простая | Высокая | Средняя | Хорошая | I/O операции |
| RabbitMQ | Средняя | Хорошая | Очень высокая | Отличная | Критичные задачи |
| Kafka | Сложная | Очень высокая | Очень высокая | Исключительная | Big Data, потоки |
Практические рекомендации
Начни с простого:
- Малое приложение? → RQ + Redis
- Среднее приложение? → Celery + Redis
- Критичные задачи? → Celery + RabbitMQ
- Big Data потоки? → Kafka
Лучшие практики:
- Используй идемпотентные задачи (выполнение дважды = выполнение один раз)
- Обработай исключения и добавь логирование
- Устанавливай timeout на задачи
- Мониторь очередь и обработку
- Используй DLQ (Dead Letter Queue) для неудачных задач