← Назад к вопросам

Что произойдет в случае двух Partition и одного Consumer в Kafka?

2.7 Senior🔥 21 комментариев
#Архитектура и паттерны#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Архитектура: 2 Partition и 1 Consumer в Kafka

Это классический вопрос о параллелизме в Kafka. Ответ зависит от количества потребителей в consumer group.

Сценарий 1: 1 Consumer в Consumer Group (одна группа)

Если у нас есть одна partition группа с одним потребителем, то этот consumer получит обе partition:

from kafka import KafkaConsumer

# Consumer из группы "my-group"
consumer = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest'
)

# Этот consumer обработает сообщения из обеих партиций
# Partition 0 -> Consumer 1
# Partition 1 -> Consumer 1
for message in consumer:
    print(f"Partition: {message.partition}, Value: {message.value}")

Результат: Один consumer обрабатывает обе partition последовательно или чередуясь.

Сценарий 2: 2 Consumer в одной Consumer Group

Если добавить второго consumer в ту же group, Kafka переприсвоит partition:

# Consumer 1
consumer1 = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092']
)
# Получит Partition 0

# Consumer 2  
consumer2 = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092']
)
# Получит Partition 1

Результат: Каждый consumer получает по одной partition — максимальный параллелизм.

Важная деталь: Rebalancing

Когда в group добавляется/удаляется consumer, происходит rebalancing — переприсвоение partition:

def on_partitions_revoked(revoked_partitions):
    print(f"Потеряли партиции: {revoked_partitions}")
    consumer.commit()  # Сохраняем offset перед отключением

def on_partitions_assigned(assigned_partitions):
    print(f"Получили партиции: {assigned_partitions}")

consumer = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'],
    on_partitions_revoked=on_partitions_revoked,
    on_partitions_assigned=on_partitions_assigned,
    session_timeout_ms=30000,  # Timeout для обнаружения падения
)

Потенциальные проблемы

Проблема 1: Дублирование сообщений Если consumer падает, а offset не был сохранён, он переобработает сообщения:

# ❌ Плохо: ручное управление offset
for message in consumer:
    process(message)
    consumer.commit()  # Если упадёт до сюда — переобработаем

# ✅ Хорошо: enable_auto_commit (по умолчанию True)
consumer = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    enable_auto_commit=True,  # Автоматически сохраняет offset
    auto_commit_interval_ms=5000  # Каждые 5 секунд
)

# ✅ Ещё лучше: атомарная обработка
for message in consumer:
    if process_with_idempotency(message):
        consumer.commit()  # Только если успешно

Проблема 2: Повторная обработка при rebalancing Если rebalancing происходит часто (из-за таймаутов), это замораживает обработку:

# Оптимальные параметры
consumer = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'],
    session_timeout_ms=30000,      # 30 сек до объявления consumer мёртвым
    heartbeat_interval_ms=10000,   # Отправляем heartbeat каждые 10 сек
    max_poll_interval_ms=300000,   # 5 минут на обработку batch'а
    fetch_max_bytes=52428800,      # 50MB за раз
)

Визуализация

Topic: my-topic
├── Partition 0
│   └── Consumer 1 (читает сообщения 0, 1, 2, 3...)
├── Partition 1
│   └── Consumer 1 (читает сообщения 0, 1, 2, 3...)
└── Consumer Group: my-group
    └── 1 member

После добавления второго consumer:

Topic: my-topic  
├── Partition 0
│   └── Consumer 1
├── Partition 1
│   └── Consumer 2
└── Consumer Group: my-group
    ├── 1 member
    └── 2 member

Правило распределения

Правило Kafka: количество partition должно быть >= количество consumer для полного параллелизма:

✅ 4 partitions + 2 consumers = 2 partition на consumer (хорошо)
✅ 2 partitions + 2 consumers = 1 partition на consumer (идеально)
⚠️ 2 partitions + 4 consumers = 2 consumer будут неактивны (пустая трата ресурсов)

Практический пример обработки

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'my-topic',
    group_id='my-group',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    max_poll_records=100  # Обрабатываем батч по 100 сообщений
)

for message in consumer:
    print(f"Partition: {message.partition}, "
          f"Offset: {message.offset}, "
          f"Value: {message.value}")
    
    # Обработка
    process(message.value)
    
    # Сохранение offset
    consumer.commit()

Итоги

  • 1 Consumer + 2 Partition: Consumer обрабатывает обе partition
  • 2 Consumer + 2 Partition: Каждый consumer получает по partition
  • Rebalancing: Автоматическое переприсвоение при изменении группы
  • Offset management: Сохраняй offset для избежания дублирования
  • Лучшая практика: партиций >= потребителей