Что такое ключ в Kafka?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое ключ в Kafka?
Ключ в Kafka — это опциональная метаинформация, которая помогает определить, в какую партицию сообщения будет отправлено. Это критически важный концепт для понимания работы Kafka и её гарантий доставки.
Основные концепции
Структура Kafka сообщения
Кафка Message {
Timestamp (когда было отправлено)
Key (опциональный, строка/байты)
Value (основное содержимое сообщения)
Headers (метаданные)
}
Ключ — это специальное поле, которое используется для:
- Определения партиции — Kafka использует ключ для вычисления хеша и выбора партиции
- Гарантии упорядочения — сообщения с одинаковым ключом гарантированно попадают в одну партицию
- Деления данных — группировка логически связанных сообщений
Как работает выбор партиции?
Если ключ НЕ указан (null)
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
null, // Ключ не указан
"message value"
);
kafkaProducer.send(record);
Поведение: Kafka использует round-robin или random распределение сообщений по всем партициям топика.
Топик "my-topic" (4 партиции):
┌─────────────┐
│ Partition 0 │ ← сообщение 1 (random)
├─────────────┤
│ Partition 1 │ ← сообщение 2 (random)
├─────────────┤
│ Partition 2 │ ← сообщение 3 (random)
├─────────────┤
│ Partition 3 │ ← сообщение 4 (random)
└─────────────┘
Проблема: Упорядочение НЕ гарантируется. Потребитель может обработать сообщения в другом порядке.
Если ключ указан (не null)
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"user-123", // Ключ: идентификатор пользователя
"user updated name to John"
);
kafkaProducer.send(record);
Поведение: Kafka вычисляет хеш ключа и определяет партицию:
Партиция = hash(key) % numberOfPartitions
Средний пример:
Топик "user-events" (4 партиции):
Ключ: "user-123"
hash("user-123") = 42
42 % 4 = 2
→ Партиция 2
Ключ: "user-456"
hash("user-456") = 87
87 % 4 = 3
→ Партиция 3
Ключ: "user-123" (снова)
hash("user-123") = 42
42 % 4 = 2
→ Партиция 2 (ТА ЖЕ!)
┌──────────────────┐
│ Partition 0 │
├──────────────────┤
│ Partition 1 │
├──────────────────┤
│ Partition 2 │ ← user-123, user-123 (упорядочено)
├──────────────────┤
│ Partition 3 │ ← user-456
└──────────────────┘
КРИТИЧЕСКИ ВАЖНОЕ: Все сообщения с одинаковым ключом попадают в одну и ту же партицию в той же последовательности.
Практические примеры
Пример 1: События пользователя (правильно с ключом)
public class UserEventProducer {
private KafkaProducer<String, String> producer;
public void sendUserEvent(String userId, String eventType, String eventData) {
String value = eventType + ": " + eventData;
ProducerRecord<String, String> record = new ProducerRecord<>(
"user-events",
userId, // Ключ: гарантирует порядок событий пользователя
value
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error: " + exception.getMessage());
} else {
System.out.println("Sent to partition: " + metadata.partition());
}
});
}
}
// Использование
producer.sendUserEvent("user-123", "LOGIN", "logged in from Chrome");
producer.sendUserEvent("user-123", "PURCHASE", "bought item #456");
producer.sendUserEvent("user-123", "LOGOUT", "logged out");
// Гарантия: ВСЕ события будут обработаны в порядке:
// 1. LOGIN
// 2. PURCHASE
// 3. LOGOUT
Пример 2: Заказы (правильно с ключом)
public class OrderEventProducer {
private KafkaProducer<String, String> producer;
public void sendOrderEvent(String customerId, OrderEvent event) {
String value = serializeToJson(event);
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
customerId, // Ключ гарантирует, что все заказы одного клиента в одной партиции
value
);
producer.send(record);
}
}
// События:
// customer-A: create_order(1000), payment_confirmed, shipped
// customer-B: create_order(2000), cancelled
// customer-A: create_order(3000)
// customer-A события → Partition 0 (упорядочено)
// customer-B события → Partition 2 (упорядочено)
Пример 3: БЕЗ ключа (опасно!)
public class BadEventProducer {
public void sendEvent(String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"events",
null, // ❌ Без ключа!
message
);
producer.send(record);
}
}
// Отправляем события одного пользователя
sendEvent("Event 1"); // Может быть в Partition 0
sendEvent("Event 2"); // Может быть в Partition 2
sendEvent("Event 3"); // Может быть в Partition 1
// Потребитель получит их в порядке:
// Event 1, Event 3, Event 2
// ❌ ПОРЯДОК НЕ ГАРАНТИРОВАН!
Гарантии доставки и ключи
Гарантия упорядочения (Ordering Guarantee)
Кака ГАРАНТИРУЕТ упорядочение на уровне ПАРТИЦИИ, не топика.
Топик = Коллекция партиций
Партиция = Упорядоченное хранилище
С одинаковым ключом:
Топик (3 партиции)
Партиция 0: msg1 → msg2 → msg3
Партиция 1: msgA → msgB
Партиция 2: msgX → msgY → msgZ
Потребитель вычитывает:
- Партицию 0 по порядку: msg1, msg2, msg3 ✓
- Партицию 1 по порядку: msgA, msgB ✓
- Партицию 2 по порядку: msgX, msgY, msgZ ✓
НО между партициями порядок НЕ гарантирован!
Гарантия At-Least-Once с ключом
Ключ НЕ влияет на гарантии доставки (at-least-once, exactly-once). Это контролируется другими параметрами:
acks(сколько реплик должны подтвердить)retries(сколько раз повторить)- Idempotent producer
Выбор ключа: Best Practices
Хорошие ключи:
// 1. Entity ID (пользователь, заказ, товар)
record = new ProducerRecord<>("users", userId, userData);
// 2. Composite key для группировки
String key = customerId + ":" + orderId;
record = new ProducerRecord<>("orders", key, orderData);
// 3. Географическое распределение
record = new ProducerRecord<>("logs", region, logEntry);
// 4. Временные окна
String key = dateFormat.format(now); // YYYY-MM-DD
record = new ProducerRecord<>("daily-stats", key, stats);
Плохие ключи:
// 1. Random UUID (каждое сообщение в разной партиции)
String key = UUID.randomUUID().toString(); // ❌ Нет упорядочения
// 2. Timestamp (низкая гарантия упорядочения)
String key = System.currentTimeMillis() + ""; // ❌ Редко совпадает
// 3. Слишком низкая кардинальность (imbalance)
record = new ProducerRecord<>("events", "admin", data); // ❌ Все в одной партиции
Java код: Отправка с ключом
Producer с ключом
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KeyedProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// Отправка с ключом
for (int i = 1; i <= 3; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"user-123", // Ключ
"Message " + i
);
RecordMetadata metadata = producer.send(record).get();
System.out.printf(
"Key: %s, Value: %s, Partition: %d, Offset: %d%n",
record.key(),
record.value(),
metadata.partition(),
metadata.offset()
);
}
} finally {
producer.close();
}
}
}
// Вывод:
// Key: user-123, Value: Message 1, Partition: 2, Offset: 0
// Key: user-123, Value: Message 2, Partition: 2, Offset: 1
// Key: user-123, Value: Message 3, Partition: 2, Offset: 2
// (ВСЕ в одной партиции 2)
Consumer с обработкой ключа
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KeyedConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"Key: %s, Value: %s, Partition: %d, Offset: %d%n",
record.key(),
record.value(),
record.partition(),
record.offset()
);
// Логика в зависимости от ключа
if ("user-123".equals(record.key())) {
processUserEvent(record.value());
} else if ("user-456".equals(record.key())) {
processAnotherUserEvent(record.value());
}
}
}
}
}
Важные заметки
Изменение числа партиций → проблема!
Первоначально 4 партиции:
user-123: hash % 4 = 2 → Partition 2
После увеличения до 6 партиций:
user-123: hash % 6 = 5 → Partition 5 (ДРУГАЯ!)
ОСТАРЫЕ сообщения остаются в Partition 2
НОВЫЕ сообщения идут в Partition 5
❌ Упорядочение нарушено!
Решение: Не добавляй партиции если важно упорядочение.
Null ключ vs Empty String
// Null ключ
ProducerRecord<String, String> record1 = new ProducerRecord<>("topic", null, "value");
// → Random партиция
// Empty string
ProducerRecord<String, String> record2 = new ProducerRecord<>("topic", "", "value");
// → Фиксированная партиция (hash("") % partitions)
Итоги
Ключ в Kafka:
- Определяет партицию: hash(key) % numberOfPartitions
- Гарантирует упорядочение: все сообщения с одинаковым ключом в одной партиции, в порядке отправки
- Опциональный: можно отправлять сообщения без ключа (но упорядочение не гарантируется)
- Критичен для: consistency, stateful processing, ordering guarantees
- Выбирай ключ: как идентификатор сущности (userId, customerId, accountId)
- ПОМНИ: порядок гарантирован только внутри партиции, не топика