← Назад к вопросам

Как гарантировать доставку в Redis Pub/Sub?

3.0 Senior🔥 111 комментариев
#Базы данных (NoSQL)#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ограничения Redis Pub/Sub

Redis Pub/Sub — это паттерн обмена сообщениями, который НЕ ГАРАНТИРУЕТ доставку сообщений. Это фундаментальная особенность его архитектуры:

  • Если подписчик отключён в момент публикации — сообщение потеряется
  • Redis хранит сообщения в памяти без персистентности (по умолчанию)
  • Нет механизма подтверждения (ACK) доставки
  • Нет очереди переповторов при сбое доставки

Это сделано по дизайну для максимальной производительности и минимальной задержки.

Решения для гарантии доставки

1. Redis Streams (современный подход)

Это рекомендуемое решение, если нужна гарантия доставки:

import redis
from datetime import datetime

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Публикация сообщения
message_id = redis_client.xadd(
    'events',  # название потока
    {'user_id': '123', 'action': 'login'},
    id='*'  # Redis генерирует ID автоматически
)
print(f'Message ID: {message_id}')

# Чтение с подтверждением (ACK)
group_name = 'my_group'

# Создать группу потребителей
try:
    redis_client.xgroup_create('events', group_name, id='0', mkstream=True)
except redis.ResponseError:
    pass  # Группа уже существует

# Чтать сообщения
messages = redis_client.xreadgroup(
    group_name,
    'consumer_1',
    {'events': '>'},  # '>' читает только новые сообщения
    count=10,
    block=0  # блокирующее чтение
)

for stream_key, message_list in messages:
    for message_id, data in message_list:
        try:
            print(f'Processing {data}')
            # Обработка сообщения
        finally:
            # Подтверждаем доставку
            redis_client.xack('events', group_name, message_id)

Преимущества Streams:

  • Персистентность в рамках сессии Redis
  • Consumer groups с подтверждением (ACK)
  • Отслеживание необработанных сообщений (pending list)
  • История сообщений с TTL

2. Комбинация с внешней базой данных

Для критических систем, где нельзя потерять сообщения:

import json
from typing import Any
from datetime import datetime

class ReliablePublisher:
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
    
    def publish_with_persistence(self, channel: str, message: dict) -> str:
        """Публикует сообщение с сохранением в БД"""
        message_id = str(datetime.utcnow().timestamp())
        message['id'] = message_id
        message['status'] = 'pending'  # pending -> processing -> delivered
        
        # Шаг 1: Сохраняем в БД (источник истины)
        self.db.execute(
            "INSERT INTO messages (id, channel, content, status) VALUES (%s, %s, %s, %s)",
            (message_id, channel, json.dumps(message), 'pending')
        )
        
        # Шаг 2: Публикуем в Redis (доставка подписчикам)
        self.redis.publish(channel, json.dumps(message))
        
        return message_id
    
    def mark_delivered(self, message_id: str):
        """Отмечаем сообщение как доставленное"""
        self.db.execute(
            "UPDATE messages SET status = %s WHERE id = %s",
            ('delivered', message_id)
        )

class ReliableSubscriber:
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
    
    def subscribe_and_process(self, channel: str):
        """Подписывается и обрабатывает с гарантией доставки"""
        pubsub = self.redis.pubsub()
        pubsub.subscribe(channel)
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                try:
                    data = json.loads(message['data'])
                    message_id = data.get('id')
                    
                    # Обрабатываем
                    self.process_message(data)
                    
                    # Отмечаем как доставленное
                    if message_id:
                        self.db.execute(
                            "UPDATE messages SET status = %s WHERE id = %s",
                            ('delivered', message_id)
                        )
                except Exception as e:
                    print(f'Error processing message: {e}')
    
    def process_message(self, data: dict):
        """Переопределить в подклассе"""
        print(f'Processing: {data}')

# Использование
# publisher = ReliablePublisher(redis_client, db_connection)
# message_id = publisher.publish_with_persistence('orders', {'order_id': '123'})

3. Retry logic с exponential backoff

Для подписчиков, которые могут временно быть недоступны:

import time
from functools import wraps
from typing import Callable

def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
    """Декоратор для повтора с экспоненциальной задержкой"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    print(f'Attempt {attempt + 1} failed. Retrying in {delay}s...')
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=5, base_delay=0.5)
def send_message_to_service(message: dict):
    """Отправляет сообщение во внешний сервис с переповторами"""
    # HTTP запрос, API call и т.д.
    pass

Рекомендации

Используй Pub/Sub только для:

  • Некритичных уведомлений
  • Real-time данных (курсы валют, котировки)
  • Broadcast сообщений, когда потеря приемлема

Используй Streams/RabbitMQ/Kafka для:

  • Критичных операций (платежи, заказы)
  • Когда нужна гарантия доставки
  • Когда нужна история сообщений
  • Когда нужно масштабирование потребителей

Dead Letter Queue (DLQ) паттерн:

class MessageQueue:
    DLQ_QUEUE = 'dlq:messages'
    MAX_RETRIES = 3
    
    def process_with_dlq(self, message_id: str, message: dict, retry_count: int = 0):
        try:
            self.process(message)
        except Exception as e:
            retry_count += 1
            if retry_count >= self.MAX_RETRIES:
                # Отправляем в Dead Letter Queue
                self.redis.lpush(
                    self.DLQ_QUEUE,
                    json.dumps({'message_id': message_id, 'error': str(e), 'message': message})
                )
            else:
                # Переповтор
                self.redis.lpush(f'retry:{retry_count}', json.dumps(message))

Итог

Redis Pub/Sub — инструмент без гарантий доставки. Для надёжности используй:

  1. Redis Streams (встроенное в Redis)
  2. Внешняя БД (источник истины)
  3. Retry механизм (переповторы и DLQ)
  4. Message Queue (RabbitMQ, Kafka для критичных систем)
Как гарантировать доставку в Redis Pub/Sub? | PrepBro