← Назад к вопросам

Как работает консьюмер?

2.0 Middle🔥 111 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как работает консьюмер

Консьюмер (Consumer) — это компонент архитектуры, который потребляет данные из источника, обычно из очереди сообщений (message queue). Консьюмеры наиболее часто используются в системах асинхронной обработки, Event-driven архитектуре и распределённых системах. Рассмотрим основные механизмы их работы.

Консьюмер в Kafka

Kafka — одна из самых популярных систем для обработки потоков данных. Консьюмер в Kafka работает следующим образом:

from kafka import KafkaConsumer
import json

# Создание консьюмера
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',  # Начать с самого начала
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Полученных сообщений
for message in consumer:
    print(f"Partition: {message.partition}")
    print(f"Offset: {message.offset}")
    print(f"Value: {message.value}")
    # Обработка сообщения

Ключевые концепции Kafka Consumer

Consumer Group — множество консьюмеров с одним group_id. Разделение данных:

consumer1 = KafkaConsumer('topic', group_id='group1')
consumer2 = KafkaConsumer('topic', group_id='group1')

# consumer1 получит одни партиции, consumer2 — другие
# Данные распределяются автоматически

Offset — позиция в потоке сообщений. Консьюмер отслеживает, какое сообщение было обработано:

consumer = KafkaConsumer(
    'topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    auto_offset_reset='earliest',   # earliest, latest, none
    enable_auto_commit=True,        # Автоматически сохранять offset
    auto_commit_interval_ms=5000    # Каждые 5 секунд
)

Rebalancing — перераспределение партиций при добавлении/удалении консьюмеров:

# Когда консьюмеры подключаются/отключаются, Kafka перераспределяет работу
# Это может вызвать небольшую задержку в обработке

Консьюмер в RabbitMQ

RabbitMQ использует другой подход с очередями и acknowledgments:

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()

# Объявить очередь
channel.queue_declare(queue='my_queue', durable=True)

def callback(ch, method, properties, body):
    print(f"Получено сообщение: {body}")
    # Обработка
    try:
        process_message(body)
        # Подтверждение обработки
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        # Отклонение и возврат в очередь
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# Подписка на очередь
channel.basic_consume(
    queue='my_queue',
    on_message_callback=callback
)

channel.start_consuming()

Асинхронные консьюмеры

Для обработки больших объёмов данных используются асинхронные консьюмеры:

import asyncio
from aiokafka import AIOKafkaConsumer
import json

async def consume():
    consumer = AIOKafkaConsumer(
        'my-topic',
        bootstrap_servers='localhost:9092',
        group_id='my-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    await consumer.start()
    try:
        async for message in consumer:
            print(f"Message: {message.value}")
            await process_message(message)
    finally:
        await consumer.stop()

async def process_message(message):
    # Асинхронная обработка
    await asyncio.sleep(1)
    print(f"Processed: {message.value}")

asyncio.run(consume())

Жизненный цикл обработки сообщения

  1. Получение (Fetch) — консьюмер запрашивает новые сообщения
  2. Десериализация — преобразование байтов в объекты
  3. Обработка (Processing) — бизнес-логика
  4. Подтверждение (Commit) — сохранение offset
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    enable_auto_commit=False,  # Ручное подтверждение
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    try:
        # 1. Получили сообщение
        data = message.value
        
        # 2. Обработали
        result = process_data(data)
        save_to_database(result)
        
        # 3. Подтвердили
        consumer.commit()  # Сохранить offset
    except Exception as e:
        print(f"Error: {e}")
        # Не коммитим — сообщение будет переобработано

Гарантии доставки

At-most-once — сообщение может быть потеряно, но обработано не более одного раза:

consumer.commit()  # Сохраняем offset ДО обработки
# Если упадём, сообщение потеряется

At-least-once — сообщение обработано минимум один раз, может быть дубль:

try:
    process_message(data)
    consumer.commit()  # Сохраняем ПОСЛЕ обработки
except:
    pass  # Переобработаем при перезагрузке

Exactly-once — сложная схема с идемпотентностью:

try:
    # Обработка с идентификатором
    message_id = data['id']
    if not is_processed(message_id):
        process_message(data)
        mark_as_processed(message_id)
    consumer.commit()
except:
    pass

Масштабирование консьюмеров

# Вертикальное масштабирование: увеличиваем мощность одного консьюмера
consumer = KafkaConsumer(
    'topic',
    max_poll_records=1000,      # Больше сообщений за раз
    fetch_min_bytes=1024 * 100  # Ждём больше данных
)

# Горизонтальное масштабирование: больше консьюмеров
# Все консьюмеры в одной group_id автоматически распределяют нагрузку
for i in range(4):
    consumer = KafkaConsumer(
        'topic',
        group_id='my-group',  # Один group_id для всех
        bootstrap_servers=['localhost:9092']
    )

Заключение

Консьюмер — это ключевой компонент для обработки асинхронных данных. Основные моменты:

  • Kafka: распределённая система с партициями и offset tracking
  • RabbitMQ: очереди с acknowledgments
  • Гарантии доставки: at-most-once, at-least-once, exactly-once
  • Масштабирование: горизонтальное через Consumer Groups
  • Асинхронность: для высокопроизводительных систем
Как работает консьюмер? | PrepBro