Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое очередь?
Очередь (queue) — это структура данных, которая работает по принципу FIFO (First In, First Out): первый добавленный элемент извлекается первым. В контексте приложений очередь часто означает систему асинхронной обработки сообщений между компонентами или сервисами.
Типы очередей
1. Очередь как структура данных В Python это встроенная структура данных из модуля collections:
from collections import deque
queue = deque()
# Добавление элемента в конец
queue.append(1)
queue.append(2)
queue.append(3)
# Извлечение элемента с начала
first = queue.popleft() # 1
second = queue.popleft() # 2
third = queue.popleft() # 3
2. Очередь сообщений (Message Queue) Это система для асинхронной передачи сообщений между приложениями. Примеры: RabbitMQ, Redis, Apache Kafka, AWS SQS.
Зачем нужны очереди сообщений
1. Асинхронная обработка Вместо синхронного ожидания выполнения задачи можно добавить её в очередь и сразу вернуть результат пользователю.
# Плохо: синхронно, блокирует пользователя
@app.post("/send-email")
def send_email(email: str, subject: str):
# Это может занять 5-10 секунд
send_email_via_smtp(email, subject)
return {"status": "sent"}
# Хорошо: асинхронно через очередь
import celery
app = celery.Celery()
@app.task
def send_email_async(email: str, subject: str):
send_email_via_smtp(email, subject)
@app.post("/send-email")
def send_email(email: str, subject: str):
# Добавляем задачу в очередь и сразу возвращаем результат
send_email_async.delay(email, subject)
return {"status": "queued"}
2. Развязывание компонентов (decoupling) Компоненты не нужно знать друг о друге. Они просто отправляют сообщения в очередь.
# Сервис 1: создание заказа
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='orders')
def create_order(user_id, products):
order = Order.objects.create(user_id=user_id, products=products)
# Отправляем сообщение в очередь
message = json.dumps({
"order_id": order.id,
"user_id": user_id,
"products": products
})
channel.basic_publish(exchange='', routing_key='orders', body=message)
return order
# Сервис 2: отправка email
def process_orders():
def callback(ch, method, properties, body):
data = json.loads(body)
send_order_confirmation_email(data['user_id'])
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.queue_declare(queue='orders')
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
# Сервис 3: инвентарь
def process_inventory():
def callback(ch, method, properties, body):
data = json.loads(body)
update_inventory(data['products'])
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.queue_declare(queue='orders')
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
3. Масштабируемость Можно добавить больше обработчиков (workers) для обработки сообщений параллельно.
# Один worker обрабатывает 10 сообщений в минуту
# Если добавить 5 workers, будут обрабатываться 50 сообщений в минуту
# Celery: несколько workers
# worker1.py
celery_app.start()
# worker2.py
celery_app.start()
# worker3.py
celery_app.start()
# Все workers обрабатывают задачи из одной очереди параллельно
4. Надёжность Если приложение упало, сообщения остаются в очереди и обрабатываются после восстановления.
5. Балансировка нагрузки Эко 30 помогает распределить нагрузку — задачи обрабатываются по мере возможности.
Примеры очередей в Python
Celery + Redis Популярное решение для асинхронной обработки задач в Django/Flask.
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
@app.task
def long_running_task(data):
# Долгая операция
result = process_data(data)
return result
# Запуск задачи
task = long_running_task.delay({"key": "value"})
# Проверка статуса
print(task.status) # 'PENDING', 'STARTED', 'SUCCESS', 'FAILURE'
print(task.result) # Результат выполнения
RabbitMQ + Pika Мощное решение для высоконагруженных систем.
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
# Объявляем очередь
channel.queue_declare(queue='task_queue', durable=True)
# Отправка сообщения
def send_task(task_data):
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(task_data),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
# Обработка сообщений
def process_messages():
def callback(ch, method, properties, body):
task_data = json.loads(body)
print(f"Обработка задачи: {task_data}")
# Выполняем задачу
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
channel.start_consuming()
Redis Queue (RQ) Симплое решение с использованием Redis.
from redis import Redis
from rq import Queue
redis_conn = Redis()
q = Queue(connection=redis_conn)
def slow_function(x):
time.sleep(10)
return x * 2
# Добавление задачи в очередь
job = q.enqueue(slow_function, 5)
# Проверка статуса
print(job.get_status()) # 'queued', 'started', 'finished'
print(job.result) # 10
Когда использовать очереди
- Долгие операции (отправка email, обработка видео, анализ данных)
- Внешние API вызовы (нестабильные сервисы)
- Массовая обработка данных
- Асинхронные уведомления
- Расстановка приоритетов задач
Альтернативы
- Pub/Sub — для broadcast сообщений (один отправитель, много получателей)
- Event Streaming (Kafka) — для потока событий с историей
- Direct API calls — для синхронных операций
Заключение
Очереди — это критическая часть современной архитектуры. Они позволяют создавать масштабируемые, надёжные системы, где компоненты могут работать независимо друг от друга. Понимание очередей необходимо для backend-разработчика.