Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Работа с очередями сообщений в Python
Очереди — критический компонент масштабируемых систем. Расскажу о практическом опыте работы с основными системами.
Зачем нужны очереди
Проблема без очередей:
Клиент → API → БД (если БД перегружена, клиент ждёт)
Решение с очередями:
Клиент → API → Очередь → Воркер → БД (клиент получает ответ сразу)
Способ 1: RabbitMQ + Celery (популярный выбор)
Celery — это распределённая система очередей для Python.
from celery import Celery
from kombu import Exchange, Queue
import time
# Инициализация
app = Celery('myapp')
app.conf.update(
broker_url='amqp://guest:guest@localhost:5672/',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
timezone='UTC',
enable_utc=True,
)
# Определение задач
@app.task(bind=True, max_retries=3)
def send_email(self, recipient: str, subject: str, body: str):
"""Асинхронная отправка email"""
try:
# Имитируем отправку
time.sleep(2)
print(f"Email отправлен {recipient}")
return {"status": "sent", "recipient": recipient}
except Exception as exc:
# Автоматический retry с экспоненциальной задержкой
self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
@app.task
def process_video(video_id: int):
"""Долгая обработка видео"""
video = get_video(video_id)
# Долгая операция
transcoded = transcode_video(video)
save_to_storage(transcoded)
return {"video_id": video_id, "status": "ready"}
# Использование в Flask/FastAPI
from flask import Flask
app_flask = Flask(__name__)
@app_flask.route('/send-email', methods=['POST'])
def send_email_route():
data = request.json
# Отправляем задачу в очередь
task = send_email.delay(
recipient=data['email'],
subject=data['subject'],
body=data['body']
)
# Возвращаем ID задачи сразу
return {"task_id": task.id, "status": "processing"}
@app_flask.route('/task/<task_id>')
def get_task_status(task_id):
"""Проверить статус задачи"""
task = app.AsyncResult(task_id)
if task.state == 'PENDING':
response = {'status': 'Ожидание', 'progress': 0}
elif task.state == 'SUCCESS':
response = {'status': 'Готово', 'result': task.result}
elif task.state == 'FAILURE':
response = {'status': 'Ошибка', 'error': str(task.info)}
else:
response = {'status': task.state, 'progress': task.info.get('progress', 0)}
return response
Запуск Celery воркеров:
# Запустить воркер
celery -A myapp worker --loglevel=info --concurrency=4
# Мониторинг задач (Flower)
celery -A myapp flower
Способ 2: Redis Queue (более простой вариант)
RQ — простая очередь на базе Redis, для простых сценариев.
from redis import Redis
from rq import Queue, Worker
from rq.job import JobStatus
# Подключение
redis_conn = Redis()
q = Queue(connection=redis_conn)
# Определение функции (обычная функция, не декоратор)
def send_email(recipient: str, subject: str):
print(f"Отправляю email на {recipient}")
time.sleep(2)
return f"Email отправлен {recipient}"
def process_payment(user_id: int, amount: float):
# Обработка платежа
api_response = call_payment_api(user_id, amount)
update_transaction_status(user_id, api_response)
return api_response
# Добавление задачи в очередь
job = q.enqueue(send_email, 'user@example.com', 'Hello')
print(f"Task ID: {job.id}")
# Проверка статуса
if job.is_finished:
print(f"Результат: {job.result}")
elif job.is_failed:
print(f"Ошибка: {job.exc_info}")
else:
print(f"Статус: {job.get_status()}")
# Запуск воркера
worker = Worker([q], connection=redis_conn)
worker.work(with_scheduler=True)
Способ 3: Kafka (для потоков данных и масштаба)
Kafka — это система потоков, для высоконагруженных систем.
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
# Продюсер — отправитель событий
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def log_user_action(user_id: int, action: str, details: dict):
"""Отправить событие в Kafka"""
event = {
"user_id": user_id,
"action": action,
"details": details,
"timestamp": datetime.utcnow().isoformat()
}
# Отправляем в топик
future = producer.send('user-actions', value=event)
# Ждём подтверждения
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")
# Использование
log_user_action(123, 'login', {'ip': '192.168.1.1'})
log_user_action(456, 'purchase', {'product_id': 789, 'amount': 99.99})
# Консьюмер — получатель событий
consumer = KafkaConsumer(
'user-actions',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='analytics-service'
)
def process_events():
"""Обработать события из Kafka"""
for message in consumer:
event = message.value
print(f"Получено событие: {event['action']} от пользователя {event['user_id']}")
# Обработка события
if event['action'] == 'purchase':
update_user_stats(event['user_id'], event['details']['amount'])
elif event['action'] == 'login':
log_analytics(event['user_id'], event['details']['ip'])
process_events()
Способ 4: Asyncio очередь (в процессе, для микрозадач)
asyncio.Queue — встроенная очередь для асинхронного кода.
import asyncio
from typing import List
class TaskQueue:
def __init__(self, num_workers: int = 4):
self.queue = asyncio.Queue()
self.num_workers = num_workers
self.workers = []
async def worker(self, worker_id: int):
"""Воркер обрабатывает задачи из очереди"""
while True:
try:
task = await asyncio.wait_for(self.queue.get(), timeout=5.0)
print(f"Воркер {worker_id}: обрабатываю {task}")
result = await process_task(task)
print(f"Воркер {worker_id}: результат {result}")
except asyncio.TimeoutError:
break
finally:
self.queue.task_done()
async def start(self):
"""Запустить воркеры"""
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.num_workers)
]
async def add_task(self, task):
"""Добавить задачу в очередь"""
await self.queue.put(task)
async def wait_completion(self):
"""Ждать завершения всех задач"""
await self.queue.join()
for worker in self.workers:
worker.cancel()
async def process_task(task: str) -> str:
"""Обработать задачу"""
await asyncio.sleep(1)
return f"Обработано: {task}"
async def main():
queue = TaskQueue(num_workers=3)
await queue.start()
# Добавляем задачи
for i in range(10):
await queue.add_task(f"Task {i}")
await queue.wait_completion()
print("Все задачи завершены")
asyncio.run(main())
Способ 5: Amazon SQS (облачное решение)
import boto3
import json
from datetime import datetime
# Инициализация
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
# Отправка сообщения
def send_message(message_data: dict):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_data),
DelaySeconds=0 # Отправить сразу
)
print(f"Message ID: {response['MessageId']}")
return response['MessageId']
# Получение сообщений
def receive_messages(max_messages: int = 10):
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=20 # Long polling
)
messages = response.get('Messages', [])
for message in messages:
body = json.loads(message['Body'])
print(f"Обработка: {body}")
# Удаляем обработанное сообщение
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return messages
# Использование
send_message({"order_id": 123, "status": "pending"})
receive_messages()
Сравнение систем очередей
| Система | Масштаб | Скорость | Сложность | Лучше для |
|---|---|---|---|---|
| RabbitMQ + Celery | Средний-Высокий | Быстро | Средняя | Асинхронные задачи, retry |
| RQ | Средний | Быстро | Низкая | Простые очереди, Redis |
| Kafka | Высокий | Очень быстро | Высокая | Потоки данных, аналитика |
| asyncio.Queue | Один сервер | Очень быстро | Низкая | Микрозадачи в памяти |
| AWS SQS | Высокий | Нормально | Средняя | Облачные приложения |
Практический совет
Выбирай систему по сценарию:
- Начни с RQ — если задачи просты и чисел не много
- Переходи на Celery — когда нужны retry, scheduling, сложная логика
- Используй Kafka — когда данные потоками и нужна история событий
- Облако (SQS) — когда хостишь на AWS и не хочешь управлять инфраструктурой
Итог
Очереди решают три проблемы:
- Масштабируемость — долгие операции не блокируют API
- Надёжность — retry, dead letter queue при ошибках
- Асинхронность — сервис может обработать множество запросов параллельно