← Назад к вопросам

Что такое ключ в Kafka?

2.3 Middle🔥 201 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое ключ в Kafka?

Ключ в Kafka — это опциональная метаинформация, которая помогает определить, в какую партицию сообщения будет отправлено. Это критически важный концепт для понимания работы Kafka и её гарантий доставки.

Основные концепции

Структура Kafka сообщения

Кафка Message {
    Timestamp         (когда было отправлено)
    Key              (опциональный, строка/байты)
    Value            (основное содержимое сообщения)
    Headers          (метаданные)
}

Ключ — это специальное поле, которое используется для:

  1. Определения партиции — Kafka использует ключ для вычисления хеша и выбора партиции
  2. Гарантии упорядочения — сообщения с одинаковым ключом гарантированно попадают в одну партицию
  3. Деления данных — группировка логически связанных сообщений

Как работает выбор партиции?

Если ключ НЕ указан (null)

ProducerRecord<String, String> record = new ProducerRecord<>(
    "my-topic",
    null,  // Ключ не указан
    "message value"
);
kafkaProducer.send(record);

Поведение: Kafka использует round-robin или random распределение сообщений по всем партициям топика.

Топик "my-topic" (4 партиции):
┌─────────────┐
│ Partition 0 │ ← сообщение 1 (random)
├─────────────┤
│ Partition 1 │ ← сообщение 2 (random)
├─────────────┤
│ Partition 2 │ ← сообщение 3 (random)
├─────────────┤
│ Partition 3 │ ← сообщение 4 (random)
└─────────────┘

Проблема: Упорядочение НЕ гарантируется. Потребитель может обработать сообщения в другом порядке.

Если ключ указан (не null)

ProducerRecord<String, String> record = new ProducerRecord<>(
    "my-topic",
    "user-123",  // Ключ: идентификатор пользователя
    "user updated name to John"
);
kafkaProducer.send(record);

Поведение: Kafka вычисляет хеш ключа и определяет партицию:

Партиция = hash(key) % numberOfPartitions

Средний пример:

Топик "user-events" (4 партиции):

Ключ: "user-123"
  hash("user-123") = 42
  42 % 4 = 2
  → Партиция 2

Ключ: "user-456"
  hash("user-456") = 87
  87 % 4 = 3
  → Партиция 3

Ключ: "user-123" (снова)
  hash("user-123") = 42
  42 % 4 = 2
  → Партиция 2 (ТА ЖЕ!)

┌──────────────────┐
│  Partition 0     │
├──────────────────┤
│  Partition 1     │
├──────────────────┤
│  Partition 2     │ ← user-123, user-123 (упорядочено)
├──────────────────┤
│  Partition 3     │ ← user-456
└──────────────────┘

КРИТИЧЕСКИ ВАЖНОЕ: Все сообщения с одинаковым ключом попадают в одну и ту же партицию в той же последовательности.

Практические примеры

Пример 1: События пользователя (правильно с ключом)

public class UserEventProducer {
    private KafkaProducer<String, String> producer;
    
    public void sendUserEvent(String userId, String eventType, String eventData) {
        String value = eventType + ": " + eventData;
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "user-events",
            userId,  // Ключ: гарантирует порядок событий пользователя
            value
        );
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Error: " + exception.getMessage());
            } else {
                System.out.println("Sent to partition: " + metadata.partition());
            }
        });
    }
}

// Использование
producer.sendUserEvent("user-123", "LOGIN", "logged in from Chrome");
producer.sendUserEvent("user-123", "PURCHASE", "bought item #456");
producer.sendUserEvent("user-123", "LOGOUT", "logged out");

// Гарантия: ВСЕ события будут обработаны в порядке:
// 1. LOGIN
// 2. PURCHASE  
// 3. LOGOUT

Пример 2: Заказы (правильно с ключом)

public class OrderEventProducer {
    private KafkaProducer<String, String> producer;
    
    public void sendOrderEvent(String customerId, OrderEvent event) {
        String value = serializeToJson(event);
        
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "orders",
            customerId,  // Ключ гарантирует, что все заказы одного клиента в одной партиции
            value
        );
        
        producer.send(record);
    }
}

// События:
// customer-A: create_order(1000), payment_confirmed, shipped
// customer-B: create_order(2000), cancelled
// customer-A: create_order(3000)

// customer-A события → Partition 0 (упорядочено)
// customer-B события → Partition 2 (упорядочено)

Пример 3: БЕЗ ключа (опасно!)

public class BadEventProducer {
    public void sendEvent(String message) {
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "events",
            null,  // ❌ Без ключа!
            message
        );
        
        producer.send(record);
    }
}

// Отправляем события одного пользователя
sendEvent("Event 1");  // Может быть в Partition 0
sendEvent("Event 2");  // Может быть в Partition 2
sendEvent("Event 3");  // Может быть в Partition 1

// Потребитель получит их в порядке:
// Event 1, Event 3, Event 2
// ❌ ПОРЯДОК НЕ ГАРАНТИРОВАН!

Гарантии доставки и ключи

Гарантия упорядочения (Ordering Guarantee)

Кака ГАРАНТИРУЕТ упорядочение на уровне ПАРТИЦИИ, не топика.

Топик = Коллекция партиций
Партиция = Упорядоченное хранилище

С одинаковым ключом:

Топик (3 партиции)
Партиция 0: msg1 → msg2 → msg3
Партиция 1: msgA → msgB
Партиция 2: msgX → msgY → msgZ

Потребитель вычитывает:

  • Партицию 0 по порядку: msg1, msg2, msg3 ✓
  • Партицию 1 по порядку: msgA, msgB ✓
  • Партицию 2 по порядку: msgX, msgY, msgZ ✓

НО между партициями порядок НЕ гарантирован!

Гарантия At-Least-Once с ключом

Ключ НЕ влияет на гарантии доставки (at-least-once, exactly-once). Это контролируется другими параметрами:

  • acks (сколько реплик должны подтвердить)
  • retries (сколько раз повторить)
  • Idempotent producer

Выбор ключа: Best Practices

Хорошие ключи:

// 1. Entity ID (пользователь, заказ, товар)
record = new ProducerRecord<>("users", userId, userData);

// 2. Composite key для группировки
String key = customerId + ":" + orderId;
record = new ProducerRecord<>("orders", key, orderData);

// 3. Географическое распределение
record = new ProducerRecord<>("logs", region, logEntry);

// 4. Временные окна
String key = dateFormat.format(now);  // YYYY-MM-DD
record = new ProducerRecord<>("daily-stats", key, stats);

Плохие ключи:

// 1. Random UUID (каждое сообщение в разной партиции)
String key = UUID.randomUUID().toString();  // ❌ Нет упорядочения

// 2. Timestamp (низкая гарантия упорядочения)
String key = System.currentTimeMillis() + "";  // ❌ Редко совпадает

// 3. Слишком низкая кардинальность (imbalance)
record = new ProducerRecord<>("events", "admin", data);  // ❌ Все в одной партиции

Java код: Отправка с ключом

Producer с ключом

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KeyedProducerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        try {
            // Отправка с ключом
            for (int i = 1; i <= 3; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                    "my-topic",
                    "user-123",  // Ключ
                    "Message " + i
                );
                
                RecordMetadata metadata = producer.send(record).get();
                System.out.printf(
                    "Key: %s, Value: %s, Partition: %d, Offset: %d%n",
                    record.key(),
                    record.value(),
                    metadata.partition(),
                    metadata.offset()
                );
            }
        } finally {
            producer.close();
        }
    }
}

// Вывод:
// Key: user-123, Value: Message 1, Partition: 2, Offset: 0
// Key: user-123, Value: Message 2, Partition: 2, Offset: 1
// Key: user-123, Value: Message 3, Partition: 2, Offset: 2
// (ВСЕ в одной партиции 2)

Consumer с обработкой ключа

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KeyedConsumerExample {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(
                    "Key: %s, Value: %s, Partition: %d, Offset: %d%n",
                    record.key(),
                    record.value(),
                    record.partition(),
                    record.offset()
                );
                
                // Логика в зависимости от ключа
                if ("user-123".equals(record.key())) {
                    processUserEvent(record.value());
                } else if ("user-456".equals(record.key())) {
                    processAnotherUserEvent(record.value());
                }
            }
        }
    }
}

Важные заметки

Изменение числа партиций → проблема!

Первоначально 4 партиции:
user-123: hash % 4 = 2 → Partition 2

После увеличения до 6 партиций:
user-123: hash % 6 = 5 → Partition 5 (ДРУГАЯ!)

ОСТАРЫЕ сообщения остаются в Partition 2
НОВЫЕ сообщения идут в Partition 5
❌ Упорядочение нарушено!

Решение: Не добавляй партиции если важно упорядочение.

Null ключ vs Empty String

// Null ключ
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic", null, "value");
// → Random партиция

// Empty string
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic", "", "value");
// → Фиксированная партиция (hash("") % partitions)

Итоги

Ключ в Kafka:

  • Определяет партицию: hash(key) % numberOfPartitions
  • Гарантирует упорядочение: все сообщения с одинаковым ключом в одной партиции, в порядке отправки
  • Опциональный: можно отправлять сообщения без ключа (но упорядочение не гарантируется)
  • Критичен для: consistency, stateful processing, ordering guarantees
  • Выбирай ключ: как идентификатор сущности (userId, customerId, accountId)
  • ПОМНИ: порядок гарантирован только внутри партиции, не топика
Что такое ключ в Kafka? | PrepBro