← Назад к вопросам

Как взаимодействовал с очередями?

1.3 Junior🔥 171 комментариев
#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Работа с очередями сообщений в Python

Очереди — критический компонент масштабируемых систем. Расскажу о практическом опыте работы с основными системами.

Зачем нужны очереди

Проблема без очередей:
  Клиент → API → БД  (если БД перегружена, клиент ждёт)

Решение с очередями:
  Клиент → API → Очередь → Воркер → БД (клиент получает ответ сразу)

Способ 1: RabbitMQ + Celery (популярный выбор)

Celery — это распределённая система очередей для Python.

from celery import Celery
from kombu import Exchange, Queue
import time

# Инициализация
app = Celery('myapp')
app.conf.update(
    broker_url='amqp://guest:guest@localhost:5672/',
    result_backend='redis://localhost:6379/0',
    task_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    enable_utc=True,
)

# Определение задач
@app.task(bind=True, max_retries=3)
def send_email(self, recipient: str, subject: str, body: str):
    """Асинхронная отправка email"""
    try:
        # Имитируем отправку
        time.sleep(2)
        print(f"Email отправлен {recipient}")
        return {"status": "sent", "recipient": recipient}
    except Exception as exc:
        # Автоматический retry с экспоненциальной задержкой
        self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@app.task
def process_video(video_id: int):
    """Долгая обработка видео"""
    video = get_video(video_id)
    # Долгая операция
    transcoded = transcode_video(video)
    save_to_storage(transcoded)
    return {"video_id": video_id, "status": "ready"}

# Использование в Flask/FastAPI
from flask import Flask

app_flask = Flask(__name__)

@app_flask.route('/send-email', methods=['POST'])
def send_email_route():
    data = request.json
    # Отправляем задачу в очередь
    task = send_email.delay(
        recipient=data['email'],
        subject=data['subject'],
        body=data['body']
    )
    # Возвращаем ID задачи сразу
    return {"task_id": task.id, "status": "processing"}

@app_flask.route('/task/<task_id>')
def get_task_status(task_id):
    """Проверить статус задачи"""
    task = app.AsyncResult(task_id)
    if task.state == 'PENDING':
        response = {'status': 'Ожидание', 'progress': 0}
    elif task.state == 'SUCCESS':
        response = {'status': 'Готово', 'result': task.result}
    elif task.state == 'FAILURE':
        response = {'status': 'Ошибка', 'error': str(task.info)}
    else:
        response = {'status': task.state, 'progress': task.info.get('progress', 0)}
    return response

Запуск Celery воркеров:

# Запустить воркер
celery -A myapp worker --loglevel=info --concurrency=4

# Мониторинг задач (Flower)
celery -A myapp flower

Способ 2: Redis Queue (более простой вариант)

RQ — простая очередь на базе Redis, для простых сценариев.

from redis import Redis
from rq import Queue, Worker
from rq.job import JobStatus

# Подключение
redis_conn = Redis()
q = Queue(connection=redis_conn)

# Определение функции (обычная функция, не декоратор)
def send_email(recipient: str, subject: str):
    print(f"Отправляю email на {recipient}")
    time.sleep(2)
    return f"Email отправлен {recipient}"

def process_payment(user_id: int, amount: float):
    # Обработка платежа
    api_response = call_payment_api(user_id, amount)
    update_transaction_status(user_id, api_response)
    return api_response

# Добавление задачи в очередь
job = q.enqueue(send_email, 'user@example.com', 'Hello')
print(f"Task ID: {job.id}")

# Проверка статуса
if job.is_finished:
    print(f"Результат: {job.result}")
elif job.is_failed:
    print(f"Ошибка: {job.exc_info}")
else:
    print(f"Статус: {job.get_status()}")

# Запуск воркера
worker = Worker([q], connection=redis_conn)
worker.work(with_scheduler=True)

Способ 3: Kafka (для потоков данных и масштаба)

Kafka — это система потоков, для высоконагруженных систем.

from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime

# Продюсер — отправитель событий
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def log_user_action(user_id: int, action: str, details: dict):
    """Отправить событие в Kafka"""
    event = {
        "user_id": user_id,
        "action": action,
        "details": details,
        "timestamp": datetime.utcnow().isoformat()
    }
    # Отправляем в топик
    future = producer.send('user-actions', value=event)
    # Ждём подтверждения
    record_metadata = future.get(timeout=10)
    print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition}")

# Использование
log_user_action(123, 'login', {'ip': '192.168.1.1'})
log_user_action(456, 'purchase', {'product_id': 789, 'amount': 99.99})

# Консьюмер — получатель событий
consumer = KafkaConsumer(
    'user-actions',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='analytics-service'
)

def process_events():
    """Обработать события из Kafka"""
    for message in consumer:
        event = message.value
        print(f"Получено событие: {event['action']} от пользователя {event['user_id']}")
        
        # Обработка события
        if event['action'] == 'purchase':
            update_user_stats(event['user_id'], event['details']['amount'])
        elif event['action'] == 'login':
            log_analytics(event['user_id'], event['details']['ip'])

process_events()

Способ 4: Asyncio очередь (в процессе, для микрозадач)

asyncio.Queue — встроенная очередь для асинхронного кода.

import asyncio
from typing import List

class TaskQueue:
    def __init__(self, num_workers: int = 4):
        self.queue = asyncio.Queue()
        self.num_workers = num_workers
        self.workers = []
    
    async def worker(self, worker_id: int):
        """Воркер обрабатывает задачи из очереди"""
        while True:
            try:
                task = await asyncio.wait_for(self.queue.get(), timeout=5.0)
                print(f"Воркер {worker_id}: обрабатываю {task}")
                result = await process_task(task)
                print(f"Воркер {worker_id}: результат {result}")
            except asyncio.TimeoutError:
                break
            finally:
                self.queue.task_done()
    
    async def start(self):
        """Запустить воркеры"""
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.num_workers)
        ]
    
    async def add_task(self, task):
        """Добавить задачу в очередь"""
        await self.queue.put(task)
    
    async def wait_completion(self):
        """Ждать завершения всех задач"""
        await self.queue.join()
        for worker in self.workers:
            worker.cancel()

async def process_task(task: str) -> str:
    """Обработать задачу"""
    await asyncio.sleep(1)
    return f"Обработано: {task}"

async def main():
    queue = TaskQueue(num_workers=3)
    await queue.start()
    
    # Добавляем задачи
    for i in range(10):
        await queue.add_task(f"Task {i}")
    
    await queue.wait_completion()
    print("Все задачи завершены")

asyncio.run(main())

Способ 5: Amazon SQS (облачное решение)

import boto3
import json
from datetime import datetime

# Инициализация
sqs = boto3.client('sqs', region_name='us-east-1')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'

# Отправка сообщения
def send_message(message_data: dict):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message_data),
        DelaySeconds=0  # Отправить сразу
    )
    print(f"Message ID: {response['MessageId']}")
    return response['MessageId']

# Получение сообщений
def receive_messages(max_messages: int = 10):
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=max_messages,
        WaitTimeSeconds=20  # Long polling
    )
    
    messages = response.get('Messages', [])
    for message in messages:
        body = json.loads(message['Body'])
        print(f"Обработка: {body}")
        
        # Удаляем обработанное сообщение
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message['ReceiptHandle']
        )
    
    return messages

# Использование
send_message({"order_id": 123, "status": "pending"})
receive_messages()

Сравнение систем очередей

СистемаМасштабСкоростьСложностьЛучше для
RabbitMQ + CeleryСредний-ВысокийБыстроСредняяАсинхронные задачи, retry
RQСреднийБыстроНизкаяПростые очереди, Redis
KafkaВысокийОчень быстроВысокаяПотоки данных, аналитика
asyncio.QueueОдин серверОчень быстроНизкаяМикрозадачи в памяти
AWS SQSВысокийНормальноСредняяОблачные приложения

Практический совет

Выбирай систему по сценарию:

  1. Начни с RQ — если задачи просты и чисел не много
  2. Переходи на Celery — когда нужны retry, scheduling, сложная логика
  3. Используй Kafka — когда данные потоками и нужна история событий
  4. Облако (SQS) — когда хостишь на AWS и не хочешь управлять инфраструктурой

Итог

Очереди решают три проблемы:

  • Масштабируемость — долгие операции не блокируют API
  • Надёжность — retry, dead letter queue при ошибках
  • Асинхронность — сервис может обработать множество запросов параллельно
Как взаимодействовал с очередями? | PrepBro