← Назад к вопросам

Понимаешь ли принцип работы с очередями сообщений в Kafka

2.0 Middle🔥 171 комментариев
#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Работа с очередями сообщений в Apache Kafka

Что такое Kafka

Apache Kafka — это распределённая система для потоковой обработки сообщений. Она спроектирована для:

  • Высокой пропускной способности (миллионы сообщений в секунду)
  • Надёжности (гарантия доставки)
  • Масштабируемости (горизонтальное расширение)

Основные концепции

1. Topic (Тема)

Topic — это категория или канал, в который публикуются сообщения:

Topic: "orders"
├── Partition 0: [msg1] [msg2] [msg3]
├── Partition 1: [msg4] [msg5] [msg6]
└── Partition 2: [msg7] [msg8] [msg9]

Каждая тема разбита на partitions для параллельной обработки.

2. Partition (Раздел)

Partition — это упорядоченная последовательность сообщений:

Partition 0:
Offset 0: {"id": 1, "amount": 100}
Offset 1: {"id": 2, "amount": 200}
Offset 2: {"id": 3, "amount": 300}

Каждое сообщение имеет offset (позицию в partition).

3. Producer (Издатель)

Producer пишет сообщения в topic:

KafkaProducer<String, String> producer = 
    new KafkaProducer<>(properties);

ProducerRecord<String, String> record = 
    new ProducerRecord<>("orders", "order-123", "{\"amount\": 100}");

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.println("Sent to partition " + 
            metadata.partition() + 
            " at offset " + 
            metadata.offset());
    } else {
        exception.printStackTrace();
    }
});

4. Consumer (Подписчик)

Consumer читает сообщения из topic:

KafkaConsumer<String, String> consumer = 
    new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("orders"));

while (true) {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf(
            "Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
            record.topic(),
            record.partition(),
            record.offset(),
            record.key(),
            record.value()
        );
        
        // Обработка сообщения
        processOrder(record.value());
    }
    
    // Коммит offset'а
    consumer.commitSync();
}

Архитектура Kafka

┌─────────────┐
│  Brokers    │  (кластер серверов Kafka)
├─────────────┤
│ Broker 1    │  Хранит копии partitions
│ Broker 2    │  Распределённое хранилище
│ Broker 3    │  Репликация для надёжности
└─────────────┘
      ↑  ↓
┌──────────────┐    ┌──────────────┐
│  Producer    │    │  Consumer    │
│  (отправляет)│    │  (получает)  │
└──────────────┘    └──────────────┘

Процесс отправки сообщения

// Шаг 1: Создать Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = 
    new KafkaProducer<>(props);

// Шаг 2: Отправить сообщение
ProducerRecord<String, String> record = 
    new ProducerRecord<>(
        "orders",           // Topic
        "order-001",        // Key (для партиционирования)
        "{amount: 100}"     // Value (сообщение)
    );

// Шаг 3: Асинхронная отправка
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // Ошибка отправки
        log.error("Failed to send", exception);
    } else {
        // Успех
        log.info("Sent: partition=" + metadata.partition() + 
                 ", offset=" + metadata.offset());
    }
});

// Шаг 4: Закрыть Producer
producer.close();

Гарантии доставки

1. At Most Once (максимум один раз)

props.put("acks", "0");  // Producer не ждёт подтверждения
// Быстро, но может потеряться

2. At Least Once (минимум один раз)

props.put("acks", "1");  // Leader подтверждает
// Может быть дублирование

3. Exactly Once (ровно один раз)

props.put("acks", "all");          // Все replicas подтверждают
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", "1");
// Самое надёжное, медленнее

Consumer Groups (Группы потребителей)

Одна группа может читать из topic параллельно:

Topic: "orders" (3 partitions)
├── Partition 0 ────→ Consumer 1
├── Partition 1 ────→ Consumer 2
└── Partition 2 ────→ Consumer 3

Consumer Group: "order-processors"

Java код:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processors");
props.put("key.deserializer", 
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
    "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = 
    new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("orders"));

// Каждый consumer автоматически получит часть partitions
while (true) {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    
    consumer.commitAsync();  // Асинхронный коммит
}

Управление Offset'ами

Offset — это позиция, с которой Consumer продолжит читать:

// Автоматический коммит
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// Ручной коммит (более контролируемо)
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = 
    new KafkaConsumer<>(props);

while (true) {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        try {
            processOrder(record.value());
            consumer.commitSync();  // Коммит только после успеха
        } catch (Exception e) {
            // Ошибка - коммит не выполнен
            // Consumer переустановится и переобработает
            log.error("Error processing", e);
        }
    }
}

Практический пример: обработка заказов

// Producer: когда заказ создан
public class OrderService {
    private KafkaProducer<String, String> producer;
    
    public void createOrder(Order order) {
        // Сохраняем в БД
        orderRepository.save(order);
        
        // Публикуем событие в Kafka
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(
                "order-events",
                order.getId(),
                objectMapper.writeValueAsString(
                    new OrderCreatedEvent(order.getId(), order.getAmount())
                )
            );
        
        producer.send(record);
    }
}

// Consumer: обработка платежей
public class PaymentProcessor {
    public void start() {
        consumer.subscribe(Arrays.asList("order-events"));
        
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                OrderCreatedEvent event = 
                    objectMapper.readValue(record.value(), OrderCreatedEvent.class);
                
                // Обработать платёж
                processPayment(event.getOrderId(), event.getAmount());
                
                consumer.commitSync();
            }
        }
    }
}

Реальные сценарии использования

  1. Event Streaming — обработка событий в реальном времени
  2. Log Aggregation — сбор логов из разных сервисов
  3. Stream Processing — обработка потоков данных (с Flink/Spark)
  4. Микросервисная архитектура — асинхронная коммуникация
  5. Real-time Analytics — аналитика на лету

Преимущества Kafka

✅ Высокая пропускная способность ✅ Надёжность (репликация) ✅ Масштабируемость ✅ Долгое хранение сообщений ✅ Asynchronous обработка

Когда НЕ использовать Kafka

❌ Нужна гарантия порядка между разными key'ами ❌ Требуется синхронный RPC ❌ Малая нагрузка (RabbitMQ проще) ❌ Нужны сложные селекторы сообщений

Заключение

Kafka — это мощный инструмент для построения асинхронных, масштабируемых систем. Основные концепции:

  • Topics → категории сообщений
  • Partitions → распараллеливание
  • Producers → отправляют
  • Consumers → получают
  • Consumer Groups → параллельная обработка
  • Offsets → отслеживание позиции

Она критична для modern backend систем!

Понимаешь ли принцип работы с очередями сообщений в Kafka | PrepBro