← Назад к вопросам
Как работает consumer с топиками в Kafka?
1.8 Middle🔥 191 комментариев
#REST API и микросервисы#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ответ
Kafka Consumer — это компонент, который читает сообщения из топиков. Это критически важная часть event-driven архитектуры. Рассмотрю полностью как это работает.
1. Базовый Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) {
// 1. Конфигурация
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 2. Создание consumer
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
// 3. Подписка на топик
consumer.subscribe(Arrays.asList("orders"));
// 4. Чтение сообщений
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"Topic: %s, Partition: %d, Offset: %d, " +
"Key: %s, Value: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
}
}
}
2. Consumer Groups — распределённое чтение
Когда несколько consumer в одной группе читают один топик:
public class ConsumerGroupExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors"); // Группа
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
// Каждый consumer в группе получит разные партиции
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value());
}
}
}
private static void processOrder(String order) {
System.out.println("Processing: " + order);
}
}
Как работает распределение:
Топик: orders (3 партиции)
Partition 0: [msg1, msg2, msg3]
Partition 1: [msg4, msg5, msg6]
Partition 2: [msg7, msg8, msg9]
Когда запускаю 3 consumer с одной GROUP_ID:
Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2
Если запущу 2 consumer:
Consumer 1 → читает Partition 0 и 1
Consumer 2 → читает Partition 2
Если запущу 4 consumer:
Consumer 1 → читает Partition 0
Consumer 2 → читает Partition 1
Consumer 3 → читает Partition 2
Consumer 4 → неактивен (нет партиций)
3. Offsets — отслеживание позиции
Consumer запоминает какие сообщения уже обработал:
public class OffsetManagementExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// earliest - начать с начала топика
// latest - начать с последних сообщений
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Автоматически коммитить offset каждые 5 секунд
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("events"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Offset: " + record.offset() +
", Value: " + record.value());
// Offset автоматически коммитится
}
}
}
}
4. Manual Offset Management
Для больше контроля:
public class ManualOffsetManagement {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Отключаю автокоммит, буду коммитить вручную
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
try {
for (ConsumerRecord<String, String> record : records) {
// Обрабатываю сообщение
processOrder(record.value());
}
// Только если успешно обработал все
consumer.commitSync(); // Синхронный коммит
} catch (Exception e) {
System.out.println("Error processing: " + e.getMessage());
// Не коммитим, перечитаем позже
}
}
}
private static void processOrder(String order) {
// Processing logic
}
}
5. Обработка ошибок и retry logic
public class RobustConsumer {
private static final int MAX_RETRIES = 3;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
// Получать макимум 100 сообщений за раз
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
processOrderWithValidation(record.value());
break; // Успех
} catch (Exception e) {
retries++;
if (retries >= MAX_RETRIES) {
// Отправить в dead letter queue
sendToDeadLetterQueue(record);
break;
}
Thread.sleep(1000 * retries); // Exponential backoff
}
}
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.println("Commit failed: " + e.getMessage());
}
}
}
private static void processOrderWithValidation(String order)
throws Exception {
// Validate and process
}
private static void sendToDeadLetterQueue(ConsumerRecord<String, String> record) {
// Send to DLQ
}
}
6. Consumer с Spring Kafka
Принято использовать в Spring Boot приложениях:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void processOrder(String message) {
System.out.println("Received message: " + message);
// Processing logic
}
// С обработкой ошибок
@KafkaListener(topics = "orders", groupId = "order-processor")
public void processOrderWithError(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment
) {
try {
System.out.println("Processing: " + record.value());
// Business logic
acknowledgment.acknowledge(); // Manual commit
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
// Не коммитим, переполучим сообщение
}
}
// Несколько partitions
@KafkaListener(topics = "orders",
groupId = "order-processor",
concurrency = "3") // 3 потока
public void processConcurrently(String message) {
System.out.println("Thread: " + Thread.currentThread().getId()
+ ", Message: " + message);
}
}
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
7. Rebalancing — перераспределение партиций
Когда consumer присоединяется или уходит из группы:
public class RebalanceListenerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
// Слушатель rebalance событий
consumer.subscribe(
Collections.singletonList("orders"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(
Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Cleanup перед отдачей партиций
}
@Override
public void onPartitionsAssigned(
Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// Инициализация после получения партиций
}
}
);
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Processing
}
}
}
8. Важные конфиги
Properties props = new Properties();
// Базовые
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// Offsets
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// earliest | latest | none
// Коммиты
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// Timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
// Партицирование
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
// Сериализация
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
Жизненный цикл Consumer
1. Создание consumer с конфигом
2. Подписка на топики (subscribe)
3. Присоединение к группе
4. Распределение партиций (rebalancing)
5. Polling и обработка сообщений
6. Коммит offset
7. Повторение с шага 5
8. При отключении: rebalancing, коммит последнего offset
Лучшие практики
✓ Используй consumer groups для масштабирования
✓ Manual commit для критичной логики
✓ Обработка ошибок и retry logic
✓ Dead Letter Queue для failed messages
✓ Мониторинг lag (отставание offset)
✓ Настройка timeout в зависимости от обработки
✓ Spring Kafka для Spring приложений
✓ Idempotent processing для дублирующихся сообщений
Consumer Groups делают Kafka мощной и масштабируемой системой для обработки потоков данных в параллель.