← Назад к вопросам

Как работает consumer с топиками в Kafka?

1.8 Middle🔥 191 комментариев
#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ответ

Kafka Consumer — это компонент, который читает сообщения из топиков. Это критически важная часть event-driven архитектуры. Рассмотрю полностью как это работает.

1. Базовый Consumer

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
    
    public static void main(String[] args) {
        // 1. Конфигурация
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        
        // 2. Создание consumer
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        // 3. Подписка на топик
        consumer.subscribe(Arrays.asList("orders"));
        
        // 4. Чтение сообщений
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(
                    "Topic: %s, Partition: %d, Offset: %d, " +
                    "Key: %s, Value: %s%n",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value()
                );
            }
        }
    }
}

2. Consumer Groups — распределённое чтение

Когда несколько consumer в одной группе читают один топик:

public class ConsumerGroupExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");  // Группа
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        consumer.subscribe(Collections.singletonList("orders"));
        
        // Каждый consumer в группе получит разные партиции
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                processOrder(record.value());
            }
        }
    }
    
    private static void processOrder(String order) {
        System.out.println("Processing: " + order);
    }
}

Как работает распределение:

Топик: orders (3 партиции)
  Partition 0: [msg1, msg2, msg3]
  Partition 1: [msg4, msg5, msg6]
  Partition 2: [msg7, msg8, msg9]

Когда запускаю 3 consumer с одной GROUP_ID:
  Consumer 1 → читает Partition 0
  Consumer 2 → читает Partition 1
  Consumer 3 → читает Partition 2

Если запущу 2 consumer:
  Consumer 1 → читает Partition 0 и 1
  Consumer 2 → читает Partition 2

Если запущу 4 consumer:
  Consumer 1 → читает Partition 0
  Consumer 2 → читает Partition 1
  Consumer 3 → читает Partition 2
  Consumer 4 → неактивен (нет партиций)

3. Offsets — отслеживание позиции

Consumer запоминает какие сообщения уже обработал:

public class OffsetManagementExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // earliest - начать с начала топика
        // latest - начать с последних сообщений
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // Автоматически коммитить offset каждые 5 секунд
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        consumer.subscribe(Collections.singletonList("events"));
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Offset: " + record.offset() + 
                    ", Value: " + record.value());
                // Offset автоматически коммитится
            }
        }
    }
}

4. Manual Offset Management

Для больше контроля:

public class ManualOffsetManagement {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // Отключаю автокоммит, буду коммитить вручную
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        consumer.subscribe(Collections.singletonList("orders"));
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            try {
                for (ConsumerRecord<String, String> record : records) {
                    // Обрабатываю сообщение
                    processOrder(record.value());
                }
                // Только если успешно обработал все
                consumer.commitSync();  // Синхронный коммит
            } catch (Exception e) {
                System.out.println("Error processing: " + e.getMessage());
                // Не коммитим, перечитаем позже
            }
        }
    }
    
    private static void processOrder(String order) {
        // Processing logic
    }
}

5. Обработка ошибок и retry logic

public class RobustConsumer {
    private static final int MAX_RETRIES = 3;
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        // Получать макимум 100 сообщений за раз
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        consumer.subscribe(Collections.singletonList("orders"));
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(1000));
            
            for (ConsumerRecord<String, String> record : records) {
                int retries = 0;
                while (retries < MAX_RETRIES) {
                    try {
                        processOrderWithValidation(record.value());
                        break;  // Успех
                    } catch (Exception e) {
                        retries++;
                        if (retries >= MAX_RETRIES) {
                            // Отправить в dead letter queue
                            sendToDeadLetterQueue(record);
                            break;
                        }
                        Thread.sleep(1000 * retries);  // Exponential backoff
                    }
                }
            }
            
            try {
                consumer.commitSync();
            } catch (CommitFailedException e) {
                System.out.println("Commit failed: " + e.getMessage());
            }
        }
    }
    
    private static void processOrderWithValidation(String order) 
            throws Exception {
        // Validate and process
    }
    
    private static void sendToDeadLetterQueue(ConsumerRecord<String, String> record) {
        // Send to DLQ
    }
}

6. Consumer с Spring Kafka

Принято использовать в Spring Boot приложениях:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {
    
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void processOrder(String message) {
        System.out.println("Received message: " + message);
        // Processing logic
    }
    
    // С обработкой ошибок
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void processOrderWithError(
        ConsumerRecord<String, String> record,
        Acknowledgment acknowledgment
    ) {
        try {
            System.out.println("Processing: " + record.value());
            // Business logic
            acknowledgment.acknowledge();  // Manual commit
        } catch (Exception e) {
            System.err.println("Error: " + e.getMessage());
            // Не коммитим, переполучим сообщение
        }
    }
    
    // Несколько partitions
    @KafkaListener(topics = "orders", 
        groupId = "order-processor",
        concurrency = "3")  // 3 потока
    public void processConcurrently(String message) {
        System.out.println("Thread: " + Thread.currentThread().getId() 
            + ", Message: " + message);
    }
}

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

7. Rebalancing — перераспределение партиций

Когда consumer присоединяется или уходит из группы:

public class RebalanceListenerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(props);
        
        // Слушатель rebalance событий
        consumer.subscribe(
            Collections.singletonList("orders"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(
                    Collection<TopicPartition> partitions) {
                    System.out.println("Partitions revoked: " + partitions);
                    // Cleanup перед отдачей партиций
                }
                
                @Override
                public void onPartitionsAssigned(
                    Collection<TopicPartition> partitions) {
                    System.out.println("Partitions assigned: " + partitions);
                    // Инициализация после получения партиций
                }
            }
        );
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            // Processing
        }
    }
}

8. Важные конфиги

Properties props = new Properties();

// Базовые
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

// Offsets
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// earliest | latest | none

// Коммиты
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

// Timeout
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");

// Партицирование
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

// Сериализация
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");

Жизненный цикл Consumer

1. Создание consumer с конфигом
2. Подписка на топики (subscribe)
3. Присоединение к группе
4. Распределение партиций (rebalancing)
5. Polling и обработка сообщений
6. Коммит offset
7. Повторение с шага 5
8. При отключении: rebalancing, коммит последнего offset

Лучшие практики

✓ Используй consumer groups для масштабирования
✓ Manual commit для критичной логики
✓ Обработка ошибок и retry logic
✓ Dead Letter Queue для failed messages
✓ Мониторинг lag (отставание offset)
✓ Настройка timeout в зависимости от обработки
✓ Spring Kafka для Spring приложений
✓ Idempotent processing для дублирующихся сообщений

Consumer Groups делают Kafka мощной и масштабируемой системой для обработки потоков данных в параллель.

Как работает consumer с топиками в Kafka? | PrepBro