← Назад к вопросам

Могут ли быть проблемы с гарантиями доставки

1.3 Junior🔥 131 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Проблемы с гарантиями доставки сообщений в распределённых системах

Гарантии доставки — одна из ключевых проблем в распределённых системах, особенно при работе с очередями сообщений (RabbitMQ, Kafka, Redis, AWS SQS и т.д.). Это важно для обеспечения надёжности приложения.

Три уровня гарантий доставки

1. At-most-once (максимум один раз)

Сообщение может быть потеряно, но не дублировано.

# Пример: Redis pub/sub (без гарантий)
import redis

redis_client = redis.Redis()

# Отправитель
redis_client.publish('notifications', 'User registered')
# Если подписчика нет в момент отправки → сообщение потеряно

# Подписчик
pubsub = redis_client.pubsub()
pubsub.subscribe('notifications')

for message in pubsub.listen():
    print(f"Получено: {message['data']}")

Проблемы:

  • Сообщения могут потеряться
  • Нет гарантии доставки
  • Быстро, но ненадёжно

Когда использовать: метрики, логи, push-уведомления (не критичные)

2. At-least-once (минимум один раз)

Сообщение гарантированно доставится, но может прийти несколько раз.

# Пример: RabbitMQ с acknowledgement
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Отправитель
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='Process order #123',
    properties=pika.BasicProperties(
        delivery_mode=pika.DeliveryMode.Persistent  # Сохранить на диск
    )
)

# Потребитель
def callback(ch, method, properties, body):
    try:
        print(f"Обработка: {body.decode()}")
        # Если здесь выключиться, сообщение переотправится
    except Exception as e:
        print(f"Ошибка: {e}")
        # Без ack сообщение вернётся в очередь
    else:
        ch.basic_ack(delivery_tag=method.delivery_tag)  # Подтверждаем

channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback,
    auto_ack=False  # Ручное подтверждение!
)

channel.start_consuming()

Проблемы:

  • Дублирование сообщений
  • Нужна идемпотентность (обработка дважды = один результат)

Когда использовать: платежи, заказы, критичные операции

3. Exactly-once (ровно один раз)

Самый сложный, требует координации.

# Пример: Kafka с транзакциями
from kafka import KafkaProducer, KafkaConsumer
import json

# Отправитель с транзакциями
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    transactional_id='my-transaction-id'  # Для идемпотентности
)

transaction_manager = producer._get_transaction_manager()
transaction_manager.begin_transaction()

try:
    producer.send('orders', {'order_id': 123, 'amount': 99.99})
    # Сохраняем offset в БД (как часть транзакции)
    save_processed_offset(123)
    transaction_manager.commit_transaction()
except Exception as e:
    transaction_manager.abort_transaction()
    raise

# Потребитель с идемпотентностью
class OrderProcessor:
    def __init__(self, db):
        self.db = db
    
    def process(self, message):
        order_id = message['order_id']
        
        # Проверяем, не обработан ли уже
        if self.db.exists(f'processed:{order_id}'):
            print("Сообщение уже обработано")
            return
        
        # Обрабатываем как атомарную операцию
        try:
            charge_user(message['amount'])
            self.db.set(f'processed:{order_id}', True)
        except Exception:
            raise  # Откатится, переотправится

Конкретные проблемы и решения

Проблема 1: Дублирование при повторной отправке

# ПРОБЛЕМА
def send_payment(user_id, amount):
    try:
        result = api.charge(user_id, amount)
        queue.send(f"payment_completed:{user_id}")
    except TimeoutError:
        # Timeout! Может быть платёж был обработан?
        send_payment(user_id, amount)  # Переотправляем
        # ОШИБКА: может быть два платежа!

# РЕШЕНИЕ: идемпотентный ключ
def send_payment_safe(user_id, amount, idempotency_key):
    # Проверяем, не обработана ли уже
    if is_processed(idempotency_key):
        return get_result(idempotency_key)
    
    # Обрабатываем с сохранением результата
    result = api.charge(
        user_id, 
        amount,
        idempotency_key=idempotency_key  # API хранит результат
    )
    mark_processed(idempotency_key, result)
    return result

Проблема 2: Потеря при отключении

# ПРОБЛЕМА: In-memory очередь
queue = []

def add_task(task):
    queue.append(task)  # При перезагрузке потеря!

# РЕШЕНИЕ: persistent очередь
import redis

redis_client = redis.Redis()

def add_task(task):
    redis_client.rpush('task_queue', json.dumps(task))
    # Redis сохраняет на диск

def process_tasks():
    while True:
        task_json = redis_client.blpop('task_queue', timeout=0)
        if task_json:
            task = json.loads(task_json[1])
            process(task)

Проблема 3: Обработка в неправильном порядке

# ПРОБЛЕМА: параллельная обработка
queue = ['order_created', 'payment_processed', 'order_shipped']
# Если обработать в другом порядке → inconsistency

# РЕШЕНИЕ: партиция по ключу
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092']
)

# Сообщения одного заказа → один partition
producer.send(
    'orders',
    key=b'order_123',  # Один order → один partition → порядок!
    value=b'{"event": "payment_processed"}'
)

Проблема 4: Слабая сеть (retry storms)

# ПРОБЛЕМА: слишком частые retry
def process_with_retry(task, max_retries=1000):
    for attempt in range(max_retries):
        try:
            return execute(task)
        except Exception:
            time.sleep(0.1)  # Слишком быстро!
            # При слабой сети: DDoS на себя

# РЕШЕНИЕ: экспоненциальная задержка + deadletter queue
def process_with_backoff(task, max_retries=5):
    for attempt in range(max_retries):
        try:
            return execute(task)
        except Exception as e:
            if attempt == max_retries - 1:
                # Отправляем в deadletter queue
                deadletter_queue.send(task)
                logging.error(f"Task failed after {max_retries} attempts")
                raise
            
            delay = 2 ** attempt  # 1, 2, 4, 8, 16 секунд
            delay += random.uniform(0, 1)  # Jitter
            time.sleep(delay)

Проблема 5: Мониторинг недоставленных сообщений

import logging
from datetime import datetime, timedelta

class ReliableQueue:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def send(self, topic, message, ttl=86400):
        message_id = uuid.uuid4()
        
        # Отправляем сообщение
        self.redis.lpush(f'queue:{topic}', json.dumps({
            'id': str(message_id),
            'data': message,
            'created_at': datetime.now().isoformat(),
            'attempts': 0
        }))
        
        # Трекируем для мониторинга
        self.redis.setex(f'tracking:{message_id}', ttl, 'pending')
        
        return message_id
    
    def ack(self, message_id):
        """Подтверждаем доставку"""
        self.redis.set(f'tracking:{message_id}', 'delivered')
    
    def check_undelivered(self):
        """Найти недоставленные сообщения"""
        keys = self.redis.keys('tracking:*')
        for key in keys:
            status = self.redis.get(key)
            if status == b'pending':
                message_id = key.decode().split(':')[1]
                logging.warning(f"Undelivered: {message_id}")
                # Можно переотправить

Сравнение популярных систем

СистемаAt-most-onceAt-least-onceExactly-once
Redis Pub/Sub
Redis Queue✓ (с transactions)
RabbitMQ✓ (сложно)
Kafka
AWS SQS
AWS SNS+SQS

Рекомендации для Python разработчиков

# 1. Используй persistent очереди
from rq import Queue  # Redis Queue
queue = Queue(connection=redis_client)
queue.enqueue(process_payment, user_id, amount)

# 2. Реализуй идемпотентность
def safe_process(task_id, data):
    if cache.get(f'processed:{task_id}'):
        return cache.get(f'result:{task_id}')
    
    result = process(data)
    cache.set(f'processed:{task_id}', True, ttl=86400)
    cache.set(f'result:{task_id}', result, ttl=86400)
    return result

# 3. Логируй все попытки
logging.info(f"Processing task {task_id}, attempt {attempt}")

# 4. Мониторь deadletter queue
@app.route('/health/deadletter')
def check_deadletter():
    count = len(deadletter_queue.get_jobs())
    if count > 0:
        alert(f"Deadletter queue has {count} items")
    return {'deadletter_count': count}

Вывод

Проблемы с гарантиями доставки возникают из-за:

  • Отказоустойчивости сети (потеря пакетов, timeouts)
  • Отказов процессов (крах сервера при обработке)
  • Параллелизма (одновременная обработка разных потоков)
  • Порядка сообщений (нарушение FIFO при распределении)

Решение: выбрать правильный уровень гарантий (at-most, at-least, exactly-once), использовать идемпотентные операции и мониторить deadletter queue.

Могут ли быть проблемы с гарантиями доставки | PrepBro