← Назад к вопросам

Какая очередность сообщений в Kafka?

1.8 Middle🔥 111 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Очередность сообщений в Kafka

Одна из ключевых особенностей Apache Kafka — это гарантии на упорядоченность сообщений. Эта тема более сложная, чем кажется, потому что очередность зависит от многих факторов. Давайте разберёмся детально.

Базовая гарантия: очередность в рамках партиции

Kafka гарантирует упорядоченность сообщений ТОЛЬКО в рамках одной партиции.

Ключевой момент:

from kafka import KafkaProducer, KafkaConsumer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Отправляем сообщения
messages = [
    {"id": 1, "event": "user_signup"},
    {"id": 2, "event": "email_verified"},
    {"id": 3, "event": "profile_updated"},
    {"id": 4, "event": "payment_made"}
]

for msg in messages:
    # По умолчанию без ключа — случайная партиция
    producer.send('events', value=msg)
    
producer.flush()

Проблема: без явного управления партициями, сообщения могут попасть в разные партиции и потребитель получит их в произвольном порядке!

Правильный подход: использование ключа

Чтобы гарантировать порядок, используй ключ — все сообщения с одинаковым ключом идут в одну партицию:

# ✅ ПРАВИЛЬНО: используем ключ
for msg in messages:
    producer.send(
        'events',
        key=str(msg['id']).encode(),  # Ключ определяет партицию
        value=msg
    )

# Все сообщения с key="1" -> партиция 0
# Все сообщения с key="2" -> партиция 1
# Все сообщения с key="3" -> партиция 2
# Порядок ГАРАНТИРОВАН в каждой партиции!

Распределение по партициям

Как Kafka выбирает партицию?

Это зависит от partitioner — алгоритм, который определяет, в какую партицию отправить сообщение:

# Kafka использует hash(key) % num_partitions

# Пример: тема с 3 партициями
key_to_partition = {}
for user_id in ["user_1", "user_2", "user_3", "user_4", "user_5"]:
    partition = hash(user_id) % 3  # Всегда одна и та же партиция для одного user_id
    key_to_partition[user_id] = partition

print(key_to_partition)
# Примерно: {'user_1': 1, 'user_2': 0, 'user_3': 2, 'user_4': 1, 'user_5': 0}

# user_1 ВСЕГДА идёт в партицию 1
# user_2 ВСЕГДА идёт в партицию 0
# Порядок внутри партиции ГАРАНТИРОВАН

Практический пример: события пользователя

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import time

# PRODUCER
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all',  # Подтверждение от всех реплик
    retries=3,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# События одного пользователя
user_id = "user_123"
events = [
    {"action": "login", "timestamp": 1000},
    {"action": "browse_product", "product_id": 42, "timestamp": 1001},
    {"action": "add_to_cart", "product_id": 42, "timestamp": 1002},
    {"action": "checkout", "total": 99.99, "timestamp": 1003},
    {"action": "payment_confirmed", "timestamp": 1004},
    {"action": "logout", "timestamp": 1005}
]

# Отправляем с ключом = user_id
for event in events:
    future = producer.send(
        'user_events',
        key=user_id.encode(),  # ВАЖНО: один user всегда в одну партицию
        value=event
    )
    # Ждём отправки
    try:
        record_metadata = future.get(timeout=10)
        print(f"Сообщение отправлено в партицию {record_metadata.partition}")
    except KafkaError:
        print(f"Ошибка отправки: {event}")

producer.flush()

# CONSUMER
consumer = KafkaConsumer(
    'user_events',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    consumer_timeout_ms=5000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("\nПолученные события (в правильном порядке):")
for message in consumer:
    event = message.value
    print(f"  {event['action']} (timestamp: {event['timestamp']})")
    
# Вывод:
# login (timestamp: 1000)
# browse_product (timestamp: 1001)
# add_to_cart (timestamp: 1002)
# checkout (timestamp: 1003)
# payment_confirmed (timestamp: 1004)
# logout (timestamp: 1005)

Очередность при множественных партициях

Что происходит, когда несколько партиций?

# Topic: "user_events" с 3 партициями

# Producer отправляет события разных пользователей:
for user_id in ["user_1", "user_2", "user_3", "user_1", "user_2"]:
    for event in events:
        producer.send('user_events', key=user_id.encode(), value=event)

# Распределение по партициям (примерно):
# Партиция 0: user_2 events...
# Партиция 1: user_1 events...
# Партиция 2: user_3 events...

# Consumer читает В ГЛОБАЛЬНОМ ПОРЯДКЕ (из всех партиций одновременно):
# user_2[0], user_1[0], user_3[0], user_2[1], user_1[1], ...

# Порядок ГАРАНТИРОВАН только внутри партиции:
# user_1 события всегда в одном порядке
# user_2 события всегда в одном порядке
# но между разными пользователями порядок может быть любым

Сценарии и гарантии

Сценарий 1: Без ключа (bad practice)

# Сообщения без ключа идут случайно в партиции
for event in events:
    producer.send('events', value=event)  # Без ключа!

# Результат: события могут перемешаться!
# Получатель видит: event_3, event_1, event_4, event_2
# ❌ Порядок НЕ гарантирован

Сценарий 2: С ключом (best practice)

# Сообщения с ключом гарантируют порядок
for event in events:
    producer.send('events', key=order_id.encode(), value=event)

# Результат: события приходят в правильном порядке
# event_1, event_2, event_3, event_4
# ✅ Порядок ГАРАНТИРОВАН

Настройка acks и очередность

# acks='0' — не ждём подтверждения (самое быстро, но может потерять сообщения)
producer = KafkaProducer(acks='0')

# acks='1' — ждём подтверждения от leader (быстро и безопасно)
producer = KafkaProducer(acks='1')

# acks='all' — ждём подтверждения от всех in-sync replicas (медленно, но самое безопасно)
producer = KafkaProducer(acks='all')

# max_in_flight_requests_per_connection — количество одновременных запросов
# Важно: max_in_flight_requests_per_connection ДОЛЖЕН быть = 1
# чтобы гарантировать порядок при retries!
producer = KafkaProducer(
    acks='all',
    retries=3,
    max_in_flight_requests_per_connection=1  # ✅ ВАЖНО для порядка
)

Потребители и очередность

# Один потребитель читает одну партицию
consumer = KafkaConsumer(
    'user_events',
    bootstrap_servers=['localhost:9092'],
    group_id='my_group',
    auto_offset_reset='earliest',
    enable_auto_commit=True  # Автоматически сохранять offset
)

# Если несколько потребителей (группа):
# consumer_1 -> партиция 0
# consumer_2 -> партиция 1
# consumer_3 -> партиция 2
# Каждый читает одну партицию в правильном порядке

# Если потребитель один, он читает ВСЕ партиции
# Но порядок между партициями не гарантирован

Важное: failure и переорdered сообщения

# Если producer запросит retries и произойдёт сбой:
producer = KafkaProducer(
    max_in_flight_requests_per_connection=5,  # ❌ ПЛОХО!
    retries=3
)

# Сценарий:
# 1. Отправляем [msg_1, msg_2, msg_3]
# 2. msg_2 не подтверждается, заново отправляется
# 3. msg_3 уже отправлена
# 4. msg_2 переотправляется
# Результат: [msg_1, msg_3, msg_2] ❌ НЕПРАВИЛЬНЫЙ ПОРЯДОК!

# Решение: max_in_flight_requests_per_connection = 1
producer = KafkaProducer(
    max_in_flight_requests_per_connection=1,  # ✅ ХОРОШО!
    retries=3
)
# Теперь msg_1 полностью подтверждена перед отправкой msg_2
# Порядок гарантирован!

Шпаргалка: гарантии очередности

СценарийСообщенияГарантия
Одна партицияВ одну партицию✅ Полный порядок
Несколько партицийС одинаковым ключом✅ Порядок в партиции
Несколько партицийБез ключа❌ Нет порядка
С retriesmax_in_flight=1✅ Порядок сохранён
С retriesmax_in_flight>1❌ Может переоснаститься

Заключение

Очередность в Kafka:

  • Гарантирована в рамках одной партиции — правило №1
  • Используй ключ — гарантирует, что связанные события идут в одну партицию
  • Установи max_in_flight_requests_per_connection=1 — при использовании retries
  • Порядок между разными партициями не гарантирован — это нормально и часто нужно

Для критичных последовательностей событий (как обработка заказа) всегда используй ключ с группирующим ID (user_id, order_id и т.п.).

Какая очередность сообщений в Kafka? | PrepBro