← Назад к вопросам

Как реализован брокер сообщений в Redis?

2.7 Senior🔥 181 комментариев
#Базы данных (NoSQL)#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как реализован брокер сообщений в Redis?

Redis — не специализированный брокер сообщений вроде RabbitMQ или Kafka, но он предоставляет встроенные структуры данных и механизмы для реализации систем обмена сообщениями. Давайте разберёмся в деталях.

Основные структуры данных Redis для сообщений

1. Pub/Sub (публикация/подписка)

Redis Pub/Sub — это паттерн распределённого обмена сообщениями на основе наблюдателей (observer pattern). Это не длительное хранилище, сообщения теряются, если нет активных подписчиков.

import redis

# Инициализация Redis клиента
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ИЗДАТЕЛЬ (Publisher)
def publisher():
    for i in range(5):
        message = f"Message {i}"
        # Отправить сообщение в канал 'news'
        r.publish('news', message)
        print(f"Published: {message}")

# ПОДПИСЧИК (Subscriber)
pubsub = r.pubsub()
pubsub.subscribe('news')

print("Waiting for messages...")
for message in pubsub.listen():
    print(f"Received: {message}")

Внутренняя реализация Pub/Sub:

Publication in Channel 'news'
         |
    ┌────┴─────────┬──────────┐
    |              |          |
┌───▼───┐  ┌──────▼───┐  ┌───▼──────┐
│ Sub 1 │  │  Sub 2   │  │  Sub 3   │
└───────┘  └──────────┘  └──────────┘

2. Списки (Lists) для очередей сообщений

Более надёжный подход — использовать списки как очередь:

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# PRODUCER - добавить сообщение в очередь
def producer(queue_name, message):
    r.lpush(queue_name, json.dumps(message))
    print(f"Queued: {message}")

# CONSUMER - получить сообщение из очереди
def consumer(queue_name):
    while True:
        # RPOP блокирует до появления сообщения (с таймаутом)
        message = r.brpop(queue_name, timeout=0)
        if message:
            key, value = message
            data = json.loads(value)
            print(f"Processing: {data}")
            # Обработка сообщения

# Использование
producer('task_queue', {'task_id': 1, 'action': 'send_email'})
consumer('task_queue')

Внутренняя структура очереди:

HEAD (Left)                    TAIL (Right)
  |                              |
  v                              v
───────────────────────────────
 msg5 | msg4 | msg3 | msg2 | msg1
───────────────────────────────
  ^                              ^
LPUSH (producer)           RPOP (consumer)

3. Sorted Sets для приоритетных очередей

Использование отсортированных множеств (sorted sets) для очередей с приоритетом:

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# PRODUCER с приоритетом (score = приоритет)
def producer_with_priority(queue_name, message, priority=0):
    # Используем текущее время как score (для FIFO внутри одного приоритета)
    score = -priority * 1e9 + time.time()
    r.zadd(queue_name, {json.dumps(message): score})
    print(f"Queued with priority {priority}: {message}")

# CONSUMER - получить сообщение с наивысшим приоритетом
def consumer_priority(queue_name):
    while True:
        # Получить первый элемент (с наименьшей score)
        messages = r.zrange(queue_name, 0, 0)
        if messages:
            message = json.loads(messages[0])
            r.zrem(queue_name, messages[0])
            print(f"Processing priority message: {message}")
        time.sleep(1)

# Использование
producer_with_priority('priority_queue', {'action': 'critical'}, priority=10)
producer_with_priority('priority_queue', {'action': 'normal'}, priority=1)
consumer_priority('priority_queue')

4. Streams (Redis 5.0+) - встроенный брокер

Redis Streams — это полноценная структура данных для реализации брокера сообщений с гарантиями доставки:

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# PRODUCER - добавить сообщение в stream
def stream_producer(stream_name, message):
    # XADD автоматически генерирует ID сообщения
    message_id = r.xadd(stream_name, {'data': json.dumps(message)})
    print(f"Published with ID: {message_id}")

# CONSUMER - читать сообщения из stream
def stream_consumer(stream_name, consumer_group, consumer_id):
    # Создать группу потребителей (если её нет)
    try:
        r.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
    except redis.ResponseError:
        pass  # Группа уже существует
    
    while True:
        # Прочитать сообщения, на которые потребитель не подписан
        messages = r.xreadgroup(
            groupname=consumer_group,
            consumername=consumer_id,
            streams={stream_name: '>'},
            count=1,
            block=0
        )
        
        if messages:
            stream, msg_list = messages[0]
            for msg_id, msg_data in msg_list:
                print(f"Received: {msg_data}")
                # Отметить сообщение как обработанное
                r.xack(stream_name, consumer_group, msg_id)

# Использование
stream_producer('orders', {'order_id': 123, 'amount': 99.99})
stream_consumer('orders', 'order_processors', 'processor-1')

Внутренняя структура Stream:

Stream: 'orders'
─────────────────────────────────
ID: 1234567890-0  | data: {...}
ID: 1234567891-0  | data: {...}
ID: 1234567892-0  | data: {...}
─────────────────────────────────

Consumer Group: 'order_processors'
- Last Delivered ID: 1234567891-0
- Pending Entries List (PEL):
  - processor-1: [1234567892-0]

Сравнение методов

"""
МЕТОД              ДОСТАВКА   ПОДПИСЧИКИ   ХРАНЕНИЕ   ИСПОЛЬЗОВАНИЕ
Pub/Sub            Потеря     Множество    Нет        Real-time: уведомления
Lists (FIFO)       Гарантия   Один         Есть       Простые очереди
Sorted Sets        Гарантия   Один         Есть       Приоритетные очереди
Streams (5.0+)     Гарантия   Группы      Есть       Полноценный брокер
"""

Практический пример: реализация работника (worker) с retry логикой

import redis
import json
import time
from datetime import datetime, timedelta

class RedisJobQueue:
    def __init__(self, redis_client, queue_name):
        self.r = redis_client
        self.queue = queue_name
        self.retry_queue = f"{queue_name}:retry"
        self.dead_letter = f"{queue_name}:dead_letter"
    
    def enqueue_job(self, job_data, delay=0):
        """Добавить задачу в очередь"""
        job = {
            'data': job_data,
            'created_at': datetime.now().isoformat(),
            'attempts': 0,
        }
        
        if delay > 0:
            # Задача с задержкой - используем Sorted Set
            score = time.time() + delay
            self.r.zadd(self.queue, {json.dumps(job): score})
        else:
            # Немедленная задача
            self.r.lpush(self.queue, json.dumps(job))
    
    def process_jobs(self, worker_id, max_retries=3):
        """Обработать задачи из очереди"""
        while True:
            # Получить задачу из основной очереди
            job_str = self.r.brpop(self.queue, timeout=1)
            
            if job_str:
                job = json.loads(job_str[1])
                try:
                    # Обработать задачу
                    self._execute_job(job['data'], worker_id)
                    print(f"[{worker_id}] Job completed: {job['data']}")
                except Exception as e:
                    # Обработка ошибки - retry
                    job['attempts'] += 1
                    if job['attempts'] < max_retries:
                        delay = 2 ** job['attempts']  # Exponential backoff
                        print(f"[{worker_id}] Retrying in {delay}s: {str(e)}")
                        self.enqueue_job(job['data'], delay=delay)
                    else:
                        # Превышено количество попыток - dead letter queue
                        job['error'] = str(e)
                        self.r.lpush(self.dead_letter, json.dumps(job))
                        print(f"[{worker_id}] Job failed (max retries): {job['data']}")
    
    def _execute_job(self, job_data, worker_id):
        """Выполнить задачу (может выбросить исключение)"""
        print(f"[{worker_id}] Processing: {job_data}")
        # Симуляция обработки
        if job_data.get('fail'):
            raise ValueError("Job failed intentionally")
        time.sleep(0.1)

# Использование
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
queue = RedisJobQueue(r, 'task_queue')

# Добавить задачи
for i in range(5):
    queue.enqueue_job({'id': i, 'name': f'task_{i}'})

# Запустить worker в отдельном потоке
worker = threading.Thread(target=queue.process_jobs, args=('worker-1',))
worker.daemon = True
worker.start()

time.sleep(2)

Сетевой протокол Redis для сообщений

Redis использует RESP (REdis Serialization Protocol) для обмена данными:

Клиент к Redis:
*3\r\n
$5\r\n
XADD\r\n
$6\r\n
orders\r\n
$1\r\n
*\r\n
$4\r\n
data\r\n
$9\r\n
message\r\n

Redis к Клиенту:
$19\r\n
1234567890-0\r\n

Когда использовать каждый подход

"""
1. Pub/Sub - когда нужна REAL-TIME доставка, потеря ОК:
   - Уведомления об изменении статуса
   - Live updates для фронтенда
   - Broadcasting команд рабочим

2. Lists (FIFO) - простая очередь с гарантией доставки:
   - Email queue
   - Image processing
   - Небольшие системы

3. Sorted Sets - приоритетные задачи:
   - Уведомления VIP пользователей
   - Критические операции
   - Планировщик задач

4. Streams - полноценный брокер с группами потребителей:
   - Системы логирования в реальном времени
   - Аналитика событий
   - Масштабируемые микросервисы
   - Consumer group pattern
"""

Таким образом, Redis предоставляет гибкий набор структур для реализации систем обмена сообщениями, от простых очередей до полноценных брокеров с потреблением группами (Redis Streams).