← Назад к вопросам
Что произойдет при сбое Leader replica в Kafka?
3.0 Senior🔥 101 комментариев
#DevOps и инфраструктура#Django
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Сбой Leader Replica в Kafka
Что такое Leader Replica
В Kafka каждый partition имеет несколько replicas (копии) данных:
- Leader Replica — основная копия, отвечает за чтение и запись
- Follower Replicas — копии данных, которые реплицируют данные с Leader
Partition 0:
Leader (Broker 1) ← Здесь пишутся и читаются данные
Follower (Broker 2) ← Копирует данные с Leader
Follower (Broker 3) ← Копирует данные с Leader
Что происходит при сбое Leader Replica
1. Инициирующий сценарий
У нас есть topic с 3 replicas:
- Broker 1: Leader
- Broker 2: Follower (ISR — In-Sync Replica)
- Broker 3: Follower (ISR)
⚡ СБОЙ: Broker 1 падает
2. Первые 10 секунд (детектирование сбоя)
# ZooKeeper или KRaft контроллер
heartbeat_timeout = 10_seconds # Дефолтно
if not received_heartbeat_from_broker_1:
controller.mark_broker_as_down(broker_1)
# Но Leader replica всё ещё активна в памяти других брокеров
3. Выбор нового Leader (Leader Election)
def elect_new_leader():
"""
Kafka выбирает нового Leader из ISR (In-Sync Replicas)
"""
# ISR = [Broker 2, Broker 3]
# (только те, кто был синхронизирован с Leader)
# Выбирается первый в ISR
new_leader = ISR[0] # Broker 2
# Результат:
# Broker 2: NEW Leader
# Broker 3: Follower
# Broker 1: Down
elect_new_leader()
Пример в виде сообщений:
До сбоя:
Topic: users, Partition: 0
Leader: 1, ISR: [1, 2, 3]
В момент сбоя Broker 1:
⚠️ Broker 1 не отвечает
После 10 секунд timeout:
Leader: 2 (elected), ISR: [2, 3]
✅ Система восстановилась
Что происходит с данными
Scenario A: Все replicas были синхронизированы (zero data loss)
# До сбоя Leader Broker 1 имел:
data = [
Message(offset=0, value="user_1 created"),
Message(offset=1, value="user_1 updated"),
Message(offset=2, value="user_2 created")
]
# Broker 2 (Follower) имел ту же копию:
follower_data = [
Message(offset=0, value="user_1 created"),
Message(offset=1, value="user_1 updated"),
Message(offset=2, value="user_2 created")
]
# Когда Broker 1 падает, Broker 2 становится Leader
# И имеет все данные ✅
Scenario B: Leader был впереди Followers (potential data loss)
# Leader Broker 1 (перед сбоем):
leader_data = [
Message(offset=0),
Message(offset=1),
Message(offset=2),
Message(offset=3), ← Только в Leader!
Message(offset=4) ← Только в Leader!
]
# Broker 2 (Follower):
follower_data = [
Message(offset=0),
Message(offset=1),
Message(offset=2) ← Не в синхронизации
]
# Broker 1 падает!
# Broker 2 становится Leader
# Messages 3 и 4 ПОТЕРЯНЫ ❌
# Причина:
# min.insync.replicas = 1 (по умолчанию)
# Leader не ждёт, пока Follower подтвердит запись
Как Kafka предотвращает потерю данных
1. min.insync.replicas конфиг
# ❌ Небезопасно
min.insync.replicas = 1
# Leader пишет, не ждёт подтверждения от Followers
# Если Leader упадёт, можешь потерять данные
# ✅ Безопаснее
min.insync.replicas = 2 # (часто используется)
# Leader пишет только если минимум 1 Follower подтвердил
# Значит: минимум 2 копии данных перед подтверждением
# ✅ Максимально безопасно
min.insync.replicas = 3
# Leader пишет только если минимум 2 Followers подтвердили
# Значит: 3 копии перед подтверждением
2. acks конфиг в Producer
from kafka import KafkaProducer
# ❌ Небезопасно
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
acks=0 # Producer не ждёт подтверждения
# Может потеряться даже на отправке
)
# ⚠️ Средне
producer = KafkaProducer(
acks=1 # Ждёт подтверждения только от Leader
# Если Leader падает сразу после, данные могут потеряться
)
# ✅ Безопасно
producer = KafkaProducer(
acks='all' # Ждёт подтверждения от всех ISR replicas
# Гарантирует, что данные на нескольких копиях
)
producer.send('users', b'user_1 created')
producer.flush() # Ждёт подтверждения
Процесс восстановления после сбоя Leader
Timeline:
Т=0:00 — Broker 1 (Leader) работает
Clients пишут и читают
Т=0:05 — ⚡ Сбой: Broker 1 перестаёт отвечать
Clients получают ошибку
Broker 1 перестаёт отправлять heartbeats
Т=0:10 — ZooKeeper/KRaft контроллер детектирует Broker 1 down
Инициируется Leader Election
Т=0:12 — ✅ Новый Leader elected (Broker 2)
Metadata обновляется
Clients получают обновленный metadata
Т=0:15 — Clients начинают писать в новый Leader (Broker 2)
✅ Система восстановилась
Т=1:00 — Broker 1 приходит в себя, перезагружается
Broker 2 (текущий Leader) отправляет данные
Broker 1 становится Follower и синхронизируется
Практический пример с кодом
from kafka import KafkaProducer, KafkaConsumer
import json
from time import sleep
# Безопасный Producer
producer = KafkaProducer(
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
acks='all', # Ждёт подтверждения от всех ISR
retries=3, # Повторяет при ошибке
max_in_flight_requests_per_connection=1 # Гарантирует порядок
)
# Отправляем сообщение
try:
future = producer.send(
'orders',
json.dumps({"order_id": 123, "amount": 99.99}).encode()
)
# Блокируем до подтверждения
record_metadata = future.get(timeout=10)
print(f"Message sent to partition {record_metadata.partition}")
print(f"At offset {record_metadata.offset}")
except Exception as e:
# Если Leader сбоится во время этого, получим исключение
print(f"Failed to send: {e}")
# Код должен переотправить или обработать ошибку
producer.close()
# Надёжный Consumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['broker1:9092', 'broker2:9092', 'broker3:9092'],
auto_offset_reset='earliest', # Читаем с начала
enable_auto_commit=False, # Сами коммитим
group_id='order_processor'
)
for message in consumer:
print(f"Received: {message.value}")
# Обрабатываем сообщение
process_order(message.value)
# Коммитим только после обработки
consumer.commit()
consumer.close()
Мониторинг сбоев Leader
# Что смотреть:
metrics = {
"UnderReplicatedPartitions": "Партиции без достаточного количества replicas",
"OfflinePartitions": "Партиции без Leader",
"LeaderElectionRateAndTimeMs": "Как часто меняется Leader и как быстро",
"ProducerFailureRate": "Процент неудачных отправок",
"ConsumerLagSumPerBroker": "Отставание потребителей"
}
# Alerts
alerts = {
"UnderReplicatedPartitions > 0": "КРИТИЧНО — потенциальная потеря данных",
"OfflinePartitions > 0": "КРИТИЧНО — сбой",
"LeaderElectionTime > 30s": "ПРЕДУПРЕЖДЕНИЕ — медленное восстановление"
}
Best Practices для production
kafka_config = {
# Topic level
"replication_factor": 3, # 3 копии (Leader + 2 Followers)
"min.insync.replicas": 2, # Требуем 2 копии перед подтверждением
# Producer level
"acks": "all", # Ждём всех ISR
"retries": 3, # Повторяем при ошибке
"max_in_flight_requests_per_connection": 1, # Гарантируем порядок
# Broker level (server.properties)
"unclean.leader.election.enable": False, # Не выбираем лидера из non-ISR
"min.insync.replicas": 2, # Подтверждаем только ISR
# Network
"connections.max.idle.ms": 540000, # 9 минут
"replica.lag.time.max.ms": 30000 # 30 секунд для отставания
}
Заключение
При сбое Leader Replica:
- Kafka автоматически выбирает новый Leader из In-Sync Replicas
- Данные не теряются, если правильно настроить min.insync.replicas и acks
- Восстановление занимает 10-30 секунд (зависит от конфигурации)
- Producers и Consumers автоматически переключаются на новый Leader
- Мониторинг критичен для раннего обнаружения проблем
Главное правило: Для критичных данных используй acks='all' и min.insync.replicas >= 2. Это гарантирует, что потеря Leader не приведёт к потере данных.