← Назад к вопросам
Приведи пример, когда на практике применяется очередь
2.0 Middle🔥 181 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Практические примеры очередей (Queues)
Очереди используются для асинхронной обработки задач и избежания блокирования. Вот практические примеры.
Пример 1: Отправка писем
Проблема: Отправка email долгая операция (1-10 секунд), не хочется блокировать пользователя.
Без очереди (ПЛОХО):
from flask import Flask, request
app = Flask(__name__)
@app.route('/register', methods=['POST'])
def register():
user_data = request.json
user = create_user(user_data)
send_email(user['email'], "Welcome!") # Ждём 5 секунд
return {"status": "registered"}, 201
Решение с очередью (Celery):
from celery import Celery
from flask import Flask
app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')
@celery.task
def send_email_task(email, subject, body):
send_smtp_email(email, subject, body)
@app.route('/register', methods=['POST'])
def register():
user_data = request.json
user = create_user(user_data)
send_email_task.delay(user['email'], "Welcome", "Thank you for registering")
return {"status": "registered"}, 201
Пример 2: Обработка видео
Конвертирование видео может занимать часы. Используем RabbitMQ:
import pika
import json
import subprocess
def convert_video(ch, method, properties, body):
data = json.loads(body)
video_id = data['video_id']
subprocess.run([
'ffmpeg',
'-i', data['path'],
'-c:v', 'libx265',
f'/output/{video_id}.mp4'
])
update_video_status(video_id, 'completed')
ch.basic_ack(delivery_tag=method.delivery_tag)
# Очередь
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='video_conversion')
channel.basic_consume(
queue='video_conversion',
on_message_callback=convert_video
)
channel.start_consuming()
# Клиент добавляет
channel.basic_publish(
exchange='',
routing_key='video_conversion',
body=json.dumps({'video_id': '123', 'path': '/uploads/video.mp4'})
)
Пример 3: Обработка заказов
Используем Kafka для обработки потока заказов:
from kafka import KafkaProducer, KafkaConsumer
import json
def create_order(order_data):
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
order = {
'order_id': generate_uuid(),
'user_id': order_data['user_id'],
'items': order_data['items']
}
producer.send('orders', value=order)
return {"status": "order_created"}
def process_orders():
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order_processors',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
order = message.value
validate_inventory(order['items'])
process_payment(order)
create_shipment(order)
Пример 4: Уведомления в реальном времени
import aio_pika
import json
async def send_notifications():
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
channel = await connection.channel()
queue = await channel.declare_queue('notifications')
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
notification = json.loads(message.body)
if notification['type'] == 'push':
send_push_notification(
notification['user_id'],
notification['message']
)
Пример 5: Redis очередь
import redis
from rq import Queue
r = redis.Redis()
q = Queue(connection=r)
def process_analytics(user_id):
events = get_user_events(user_id)
calculate_statistics(events)
# Добавляем задачу
job = q.enqueue(process_analytics, 123)
# Проверяем статус
job.get_status()
Пример 6: Параллельная обработка
from concurrent.futures import ThreadPoolExecutor
class ResourcePool:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def process_batch(self, tasks):
futures = [self.executor.submit(t['func'], *t['args']) for t in tasks]
return [f.result() for f in futures]
pool = ResourcePool(max_workers=8)
tasks = [{'func': fetch_data, 'args': (url,)} for url in urls]
results = pool.process_batch(tasks)
Когда использовать очереди
- Долгие операции (Email, видео)
- Пиковые нагрузки
- Фоновая работа (аналитика)
- Деградированные сценарии
- Распределённые системы
Основные инструменты
- Celery - распределённая обработка
- RabbitMQ - очередь сообщений
- Redis - быстрая очередь
- Kafka - высокопроизводительная
- AWS SQS - облачная
- RQ - простая очередь на Redis
Преимущества
- Асинхронная обработка
- Масштабируемость
- Надёжность и персистентность
- Управление нагрузкой
- Разделение ответственности