← Назад к вопросам

Что произойдет при сбое Leader replica в Kafka?

3.0 Senior🔥 101 комментариев
#DevOps и инфраструктура#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Сбой Leader Replica в Kafka

Что такое Leader Replica

В Kafka каждый partition имеет несколько replicas (копии) данных:

  • Leader Replica — основная копия, отвечает за чтение и запись
  • Follower Replicas — копии данных, которые реплицируют данные с Leader
Partition 0:
  Leader (Broker 1)     ← Здесь пишутся и читаются данные
  Follower (Broker 2)   ← Копирует данные с Leader
  Follower (Broker 3)   ← Копирует данные с Leader

Что происходит при сбое Leader Replica

1. Инициирующий сценарий

У нас есть topic с 3 replicas:
- Broker 1: Leader
- Broker 2: Follower (ISR — In-Sync Replica)
- Broker 3: Follower (ISR)

⚡ СБОЙ: Broker 1 падает

2. Первые 10 секунд (детектирование сбоя)

# ZooKeeper или KRaft контроллер
heartbeat_timeout = 10_seconds  # Дефолтно

if not received_heartbeat_from_broker_1:
    controller.mark_broker_as_down(broker_1)
    # Но Leader replica всё ещё активна в памяти других брокеров

3. Выбор нового Leader (Leader Election)

def elect_new_leader():
    """
    Kafka выбирает нового Leader из ISR (In-Sync Replicas)
    """
    # ISR = [Broker 2, Broker 3]
    # (только те, кто был синхронизирован с Leader)
    
    # Выбирается первый в ISR
    new_leader = ISR[0]  # Broker 2
    
    # Результат:
    # Broker 2: NEW Leader
    # Broker 3: Follower
    # Broker 1: Down

elect_new_leader()

Пример в виде сообщений:

До сбоя:
Topic: users, Partition: 0
Leader: 1, ISR: [1, 2, 3]

В момент сбоя Broker 1:
⚠️ Broker 1 не отвечает

После 10 секунд timeout:
Leader: 2 (elected), ISR: [2, 3]

✅ Система восстановилась

Что происходит с данными

Scenario A: Все replicas были синхронизированы (zero data loss)

# До сбоя Leader Broker 1 имел:
data = [
    Message(offset=0, value="user_1 created"),
    Message(offset=1, value="user_1 updated"),
    Message(offset=2, value="user_2 created")
]

# Broker 2 (Follower) имел ту же копию:
follower_data = [
    Message(offset=0, value="user_1 created"),
    Message(offset=1, value="user_1 updated"),
    Message(offset=2, value="user_2 created")
]

# Когда Broker 1 падает, Broker 2 становится Leader
# И имеет все данные ✅

Scenario B: Leader был впереди Followers (potential data loss)

# Leader Broker 1 (перед сбоем):
leader_data = [
    Message(offset=0),
    Message(offset=1),
    Message(offset=2),
    Message(offset=3),  ← Только в Leader!
    Message(offset=4)   ← Только в Leader!
]

# Broker 2 (Follower):
follower_data = [
    Message(offset=0),
    Message(offset=1),
    Message(offset=2)   ← Не в синхронизации
]

# Broker 1 падает! 
# Broker 2 становится Leader
# Messages 3 и 4 ПОТЕРЯНЫ ❌

# Причина:
# min.insync.replicas = 1 (по умолчанию)
# Leader не ждёт, пока Follower подтвердит запись

Как Kafka предотвращает потерю данных

1. min.insync.replicas конфиг

# ❌ Небезопасно
min.insync.replicas = 1
# Leader пишет, не ждёт подтверждения от Followers
# Если Leader упадёт, можешь потерять данные

# ✅ Безопаснее
min.insync.replicas = 2  # (часто используется)
# Leader пишет только если минимум 1 Follower подтвердил
# Значит: минимум 2 копии данных перед подтверждением

# ✅ Максимально безопасно
min.insync.replicas = 3
# Leader пишет только если минимум 2 Followers подтвердили
# Значит: 3 копии перед подтверждением

2. acks конфиг в Producer

from kafka import KafkaProducer

# ❌ Небезопасно
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks=0  # Producer не ждёт подтверждения
    # Может потеряться даже на отправке
)

# ⚠️ Средне
producer = KafkaProducer(
    acks=1  # Ждёт подтверждения только от Leader
    # Если Leader падает сразу после, данные могут потеряться
)

# ✅ Безопасно
producer = KafkaProducer(
    acks='all'  # Ждёт подтверждения от всех ISR replicas
    # Гарантирует, что данные на нескольких копиях
)

producer.send('users', b'user_1 created')
producer.flush()  # Ждёт подтверждения

Процесс восстановления после сбоя Leader

Timeline:

Т=0:00 — Broker 1 (Leader) работает
         Clients пишут и читают

Т=0:05 — ⚡ Сбой: Broker 1 перестаёт отвечать
         Clients получают ошибку
         Broker 1 перестаёт отправлять heartbeats

Т=0:10 — ZooKeeper/KRaft контроллер детектирует Broker 1 down
         Инициируется Leader Election

Т=0:12 — ✅ Новый Leader elected (Broker 2)
         Metadata обновляется
         Clients получают обновленный metadata

Т=0:15 — Clients начинают писать в новый Leader (Broker 2)
         ✅ Система восстановилась

Т=1:00 — Broker 1 приходит в себя, перезагружается
         Broker 2 (текущий Leader) отправляет данные
         Broker 1 становится Follower и синхронизируется

Практический пример с кодом

from kafka import KafkaProducer, KafkaConsumer
import json
from time import sleep

# Безопасный Producer
producer = KafkaProducer(
    bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
    acks='all',  # Ждёт подтверждения от всех ISR
    retries=3,  # Повторяет при ошибке
    max_in_flight_requests_per_connection=1  # Гарантирует порядок
)

# Отправляем сообщение
try:
    future = producer.send(
        'orders',
        json.dumps({"order_id": 123, "amount": 99.99}).encode()
    )
    
    # Блокируем до подтверждения
    record_metadata = future.get(timeout=10)
    print(f"Message sent to partition {record_metadata.partition}")
    print(f"At offset {record_metadata.offset}")
    
except Exception as e:
    # Если Leader сбоится во время этого, получим исключение
    print(f"Failed to send: {e}")
    # Код должен переотправить или обработать ошибку

producer.close()

# Надёжный Consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
    auto_offset_reset='earliest',  # Читаем с начала
    enable_auto_commit=False,  # Сами коммитим
    group_id='order_processor'
)

for message in consumer:
    print(f"Received: {message.value}")
    # Обрабатываем сообщение
    process_order(message.value)
    # Коммитим только после обработки
    consumer.commit()
    
consumer.close()

Мониторинг сбоев Leader

# Что смотреть:
metrics = {
    "UnderReplicatedPartitions": "Партиции без достаточного количества replicas",
    "OfflinePartitions": "Партиции без Leader",
    "LeaderElectionRateAndTimeMs": "Как часто меняется Leader и как быстро",
    "ProducerFailureRate": "Процент неудачных отправок",
    "ConsumerLagSumPerBroker": "Отставание потребителей"
}

# Alerts
alerts = {
    "UnderReplicatedPartitions > 0": "КРИТИЧНО — потенциальная потеря данных",
    "OfflinePartitions > 0": "КРИТИЧНО — сбой",
    "LeaderElectionTime > 30s": "ПРЕДУПРЕЖДЕНИЕ — медленное восстановление"
}

Best Practices для production

kafka_config = {
    # Topic level
    "replication_factor": 3,  # 3 копии (Leader + 2 Followers)
    "min.insync.replicas": 2,  # Требуем 2 копии перед подтверждением
    
    # Producer level
    "acks": "all",  # Ждём всех ISR
    "retries": 3,  # Повторяем при ошибке
    "max_in_flight_requests_per_connection": 1,  # Гарантируем порядок
    
    # Broker level (server.properties)
    "unclean.leader.election.enable": False,  # Не выбираем лидера из non-ISR
    "min.insync.replicas": 2,  # Подтверждаем только ISR
    
    # Network
    "connections.max.idle.ms": 540000,  # 9 минут
    "replica.lag.time.max.ms": 30000  # 30 секунд для отставания
}

Заключение

При сбое Leader Replica:

  1. Kafka автоматически выбирает новый Leader из In-Sync Replicas
  2. Данные не теряются, если правильно настроить min.insync.replicas и acks
  3. Восстановление занимает 10-30 секунд (зависит от конфигурации)
  4. Producers и Consumers автоматически переключаются на новый Leader
  5. Мониторинг критичен для раннего обнаружения проблем

Главное правило: Для критичных данных используй acks='all' и min.insync.replicas >= 2. Это гарантирует, что потеря Leader не приведёт к потере данных.