← Назад к вопросам
Как гарантировать доставку в Redis Pub/Sub?
3.0 Senior🔥 111 комментариев
#Базы данных (NoSQL)#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ограничения Redis Pub/Sub
Redis Pub/Sub — это паттерн обмена сообщениями, который НЕ ГАРАНТИРУЕТ доставку сообщений. Это фундаментальная особенность его архитектуры:
- Если подписчик отключён в момент публикации — сообщение потеряется
- Redis хранит сообщения в памяти без персистентности (по умолчанию)
- Нет механизма подтверждения (ACK) доставки
- Нет очереди переповторов при сбое доставки
Это сделано по дизайну для максимальной производительности и минимальной задержки.
Решения для гарантии доставки
1. Redis Streams (современный подход)
Это рекомендуемое решение, если нужна гарантия доставки:
import redis
from datetime import datetime
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Публикация сообщения
message_id = redis_client.xadd(
'events', # название потока
{'user_id': '123', 'action': 'login'},
id='*' # Redis генерирует ID автоматически
)
print(f'Message ID: {message_id}')
# Чтение с подтверждением (ACK)
group_name = 'my_group'
# Создать группу потребителей
try:
redis_client.xgroup_create('events', group_name, id='0', mkstream=True)
except redis.ResponseError:
pass # Группа уже существует
# Чтать сообщения
messages = redis_client.xreadgroup(
group_name,
'consumer_1',
{'events': '>'}, # '>' читает только новые сообщения
count=10,
block=0 # блокирующее чтение
)
for stream_key, message_list in messages:
for message_id, data in message_list:
try:
print(f'Processing {data}')
# Обработка сообщения
finally:
# Подтверждаем доставку
redis_client.xack('events', group_name, message_id)
Преимущества Streams:
- Персистентность в рамках сессии Redis
- Consumer groups с подтверждением (ACK)
- Отслеживание необработанных сообщений (pending list)
- История сообщений с TTL
2. Комбинация с внешней базой данных
Для критических систем, где нельзя потерять сообщения:
import json
from typing import Any
from datetime import datetime
class ReliablePublisher:
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
def publish_with_persistence(self, channel: str, message: dict) -> str:
"""Публикует сообщение с сохранением в БД"""
message_id = str(datetime.utcnow().timestamp())
message['id'] = message_id
message['status'] = 'pending' # pending -> processing -> delivered
# Шаг 1: Сохраняем в БД (источник истины)
self.db.execute(
"INSERT INTO messages (id, channel, content, status) VALUES (%s, %s, %s, %s)",
(message_id, channel, json.dumps(message), 'pending')
)
# Шаг 2: Публикуем в Redis (доставка подписчикам)
self.redis.publish(channel, json.dumps(message))
return message_id
def mark_delivered(self, message_id: str):
"""Отмечаем сообщение как доставленное"""
self.db.execute(
"UPDATE messages SET status = %s WHERE id = %s",
('delivered', message_id)
)
class ReliableSubscriber:
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
def subscribe_and_process(self, channel: str):
"""Подписывается и обрабатывает с гарантией доставки"""
pubsub = self.redis.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
message_id = data.get('id')
# Обрабатываем
self.process_message(data)
# Отмечаем как доставленное
if message_id:
self.db.execute(
"UPDATE messages SET status = %s WHERE id = %s",
('delivered', message_id)
)
except Exception as e:
print(f'Error processing message: {e}')
def process_message(self, data: dict):
"""Переопределить в подклассе"""
print(f'Processing: {data}')
# Использование
# publisher = ReliablePublisher(redis_client, db_connection)
# message_id = publisher.publish_with_persistence('orders', {'order_id': '123'})
3. Retry logic с exponential backoff
Для подписчиков, которые могут временно быть недоступны:
import time
from functools import wraps
from typing import Callable
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
"""Декоратор для повтора с экспоненциальной задержкой"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f'Attempt {attempt + 1} failed. Retrying in {delay}s...')
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=5, base_delay=0.5)
def send_message_to_service(message: dict):
"""Отправляет сообщение во внешний сервис с переповторами"""
# HTTP запрос, API call и т.д.
pass
Рекомендации
Используй Pub/Sub только для:
- Некритичных уведомлений
- Real-time данных (курсы валют, котировки)
- Broadcast сообщений, когда потеря приемлема
Используй Streams/RabbitMQ/Kafka для:
- Критичных операций (платежи, заказы)
- Когда нужна гарантия доставки
- Когда нужна история сообщений
- Когда нужно масштабирование потребителей
Dead Letter Queue (DLQ) паттерн:
class MessageQueue:
DLQ_QUEUE = 'dlq:messages'
MAX_RETRIES = 3
def process_with_dlq(self, message_id: str, message: dict, retry_count: int = 0):
try:
self.process(message)
except Exception as e:
retry_count += 1
if retry_count >= self.MAX_RETRIES:
# Отправляем в Dead Letter Queue
self.redis.lpush(
self.DLQ_QUEUE,
json.dumps({'message_id': message_id, 'error': str(e), 'message': message})
)
else:
# Переповтор
self.redis.lpush(f'retry:{retry_count}', json.dumps(message))
Итог
Redis Pub/Sub — инструмент без гарантий доставки. Для надёжности используй:
- Redis Streams (встроенное в Redis)
- Внешняя БД (источник истины)
- Retry механизм (переповторы и DLQ)
- Message Queue (RabbitMQ, Kafka для критичных систем)