← Назад к вопросам

Приведи пример, когда на практике применяется очередь

2.0 Middle🔥 181 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Практические примеры очередей (Queues)

Очереди используются для асинхронной обработки задач и избежания блокирования. Вот практические примеры.

Пример 1: Отправка писем

Проблема: Отправка email долгая операция (1-10 секунд), не хочется блокировать пользователя.

Без очереди (ПЛОХО):

from flask import Flask, request

app = Flask(__name__)

@app.route('/register', methods=['POST'])
def register():
    user_data = request.json
    user = create_user(user_data)
    send_email(user['email'], "Welcome!")  # Ждём 5 секунд
    return {"status": "registered"}, 201

Решение с очередью (Celery):

from celery import Celery
from flask import Flask

app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')

@celery.task
def send_email_task(email, subject, body):
    send_smtp_email(email, subject, body)

@app.route('/register', methods=['POST'])
def register():
    user_data = request.json
    user = create_user(user_data)
    send_email_task.delay(user['email'], "Welcome", "Thank you for registering")
    return {"status": "registered"}, 201

Пример 2: Обработка видео

Конвертирование видео может занимать часы. Используем RabbitMQ:

import pika
import json
import subprocess

def convert_video(ch, method, properties, body):
    data = json.loads(body)
    video_id = data['video_id']
    
    subprocess.run([
        'ffmpeg',
        '-i', data['path'],
        '-c:v', 'libx265',
        f'/output/{video_id}.mp4'
    ])
    
    update_video_status(video_id, 'completed')
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Очередь
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='video_conversion')
channel.basic_consume(
    queue='video_conversion',
    on_message_callback=convert_video
)
channel.start_consuming()

# Клиент добавляет
channel.basic_publish(
    exchange='',
    routing_key='video_conversion',
    body=json.dumps({'video_id': '123', 'path': '/uploads/video.mp4'})
)

Пример 3: Обработка заказов

Используем Kafka для обработки потока заказов:

from kafka import KafkaProducer, KafkaConsumer
import json

def create_order(order_data):
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    order = {
        'order_id': generate_uuid(),
        'user_id': order_data['user_id'],
        'items': order_data['items']
    }
    
    producer.send('orders', value=order)
    return {"status": "order_created"}

def process_orders():
    consumer = KafkaConsumer(
        'orders',
        bootstrap_servers=['localhost:9092'],
        group_id='order_processors',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    for message in consumer:
        order = message.value
        validate_inventory(order['items'])
        process_payment(order)
        create_shipment(order)

Пример 4: Уведомления в реальном времени

import aio_pika
import json

async def send_notifications():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    channel = await connection.channel()
    queue = await channel.declare_queue('notifications')
    
    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                notification = json.loads(message.body)
                if notification['type'] == 'push':
                    send_push_notification(
                        notification['user_id'],
                        notification['message']
                    )

Пример 5: Redis очередь

import redis
from rq import Queue

r = redis.Redis()
q = Queue(connection=r)

def process_analytics(user_id):
    events = get_user_events(user_id)
    calculate_statistics(events)

# Добавляем задачу
job = q.enqueue(process_analytics, 123)

# Проверяем статус
job.get_status()

Пример 6: Параллельная обработка

from concurrent.futures import ThreadPoolExecutor

class ResourcePool:
    def __init__(self, max_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def process_batch(self, tasks):
        futures = [self.executor.submit(t['func'], *t['args']) for t in tasks]
        return [f.result() for f in futures]

pool = ResourcePool(max_workers=8)
tasks = [{'func': fetch_data, 'args': (url,)} for url in urls]
results = pool.process_batch(tasks)

Когда использовать очереди

  1. Долгие операции (Email, видео)
  2. Пиковые нагрузки
  3. Фоновая работа (аналитика)
  4. Деградированные сценарии
  5. Распределённые системы

Основные инструменты

  • Celery - распределённая обработка
  • RabbitMQ - очередь сообщений
  • Redis - быстрая очередь
  • Kafka - высокопроизводительная
  • AWS SQS - облачная
  • RQ - простая очередь на Redis

Преимущества

  1. Асинхронная обработка
  2. Масштабируемость
  3. Надёжность и персистентность
  4. Управление нагрузкой
  5. Разделение ответственности