← Назад к вопросам

Что такое Dead Letter Queue?

2.0 Middle🔥 201 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Dead Letter Queue (DLQ) — Очередь недоставленных сообщений

Dead Letter Queue — это отдельная очередь обмена сообщениями (message queue), предназначенная для хранения сообщений, которые не могли быть обработаны основной системой. DLQ — критический компонент надёжной архитектуры асинхронной обработки данных.

Зачем нужна DLQ?

В системах с асинхронной обработкой сообщений не все сообщения можно обработать успешно:

  • Повреждённые данные — сообщение содержит невалидные данные
  • Сбои сервиса — целевой сервис недоступен или перегружен
  • Непредвиденные исключения — ошибки в коде обработчика
  • Истечение ttl — сообщение устарело и больше не актуально

Без DLQ такие сообщения либо теряются (потеря данных), либо зависают в очереди (бесконечный retry), либо блокируют обработку других сообщений.

Как работает DLQ?

from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError

app = Celery("myapp")
app.conf.update(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
)

# Обработчик с автоматической отправкой в DLQ
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id: int):
    try:
        # Основная логика обработки
        order = fetch_order(order_id)
        validate_order(order)
        send_to_warehouse(order)
    except ValueError as exc:
        # Невалидные данные — отправляем в DLQ без retry
        send_to_dlq(order_id, str(exc), "invalid_data")
        raise
    except ConnectionError as exc:
        # Сбой сервиса — retry с экспоненциальной задержкой
        try:
            raise self.retry(exc=exc, countdown=2 ** self.request.retries)
        except MaxRetriesExceededError:
            # Исчерпаны все попытки — в DLQ
            send_to_dlq(order_id, str(exc), "max_retries")
            raise

RabbitMQ с Dead Letter Exchange (DLX)

RabbitMQ имеет встроенный механизм DLQ через Dead Letter Exchange:

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

# Основная очередь с DLX
channel.exchange_declare(
    exchange="orders_dlx",
    exchange_type="direct",
    durable=True
)
channel.queue_declare(queue="orders_dlq", durable=True)
channel.queue_bind(
    exchange="orders_dlx",
    queue="orders_dlq",
    routing_key="orders"
)

# Основная очередь с ссылкой на DLX
channel.exchange_declare(
    exchange="orders",
    exchange_type="direct",
    durable=True
)
channel.queue_declare(
    queue="orders",
    durable=True,
    arguments={
        "x-dead-letter-exchange": "orders_dlx",
        "x-dead-letter-routing-key": "orders",
        "x-message-ttl": 3600000,  # 1 час
        "x-max-length": 100000     # Макс сообщений
    }
)
channel.queue_bind(
    exchange="orders",
    queue="orders",
    routing_key="orders"
)

# Обработчик с обработкой ошибок
def process_message(ch, method, properties, body):
    try:
        data = json.loads(body)
        # Обработка...
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except json.JSONDecodeError as e:
        # Невалидный JSON — отправляем в DLQ без retry
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    except Exception as e:
        # Другие ошибки — пытаемся повторить
        if method.delivery_tag % 10 > 3:  # Макс 3 попытки
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        else:
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue="orders", on_message_callback=process_message)
channel.start_consuming()

Стратегии обработки DLQ

1. Мониторинг и алерты

import logging

def monitor_dlq():
    dlq_size = get_queue_length("orders_dlq")
    if dlq_size > 100:
        logger.error(f"DLQ переполнена: {dlq_size} сообщений")
        send_alert("ops-team", f"DLQ alert: {dlq_size} messages")

2. Ручное переопубликование (replay)

def replay_from_dlq(queue_name: str, limit: int = 10):
    for message in get_messages(f"{queue_name}_dlq", limit):
        try:
            # Переопубликуем в основную очередь
            publish_to_queue(queue_name, message)
            delete_from_dlq(message.id)
        except Exception as e:
            logger.error(f"Replay failed: {e}")

3. Экспорт в хранилище

def export_dlq_to_storage():
    messages = get_all_dlq_messages()
    for msg in messages:
        save_to_database("failed_messages", {
            "original_queue": msg.queue,
            "payload": msg.body,
            "error": msg.error,
            "timestamp": msg.created_at,
            "retry_count": msg.retry_count
        })

Лучшие практики

✓ Сделай:

  • Устанавливай TTL на сообщения в очереди (чтобы старые не скапливались)
  • Логируй причину отправки в DLQ (невалидные данные, таймауты и т.д.)
  • Мониторь размер DLQ и настраивай алерты
  • Реализуй replay механизм для обработки сообщений из DLQ после fixes
  • Сохраняй контекст (headers, metadata) вместе с сообщением

✗ Не делай:

  • Игнорируй DLQ — это признак проблем в системе
  • Не перезагружай сообщения без анализа ошибки
  • Не удаляй сообщения из DLQ без бэкапа

AWS SQS Dead Letter Queue

import boto3

sqs = boto3.client("sqs")

# Создание основной очереди со ссылкой на DLQ
response = sqs.create_queue(
    QueueName="orders-queue",
    Attributes={
        "RedrivePolicy": json.dumps({
            "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789012:orders-dlq",
            "maxReceiveCount": "3"  # Макс 3 попытки
        })
    }
)

Dead Letter Queue — это не "мусорка", а важный компонент reliability инженерии. Правильная обработка DLQ предотвращает потерю данных и помогает быстро диагностировать проблемы в production.

Что такое Dead Letter Queue? | PrepBro