Понимаешь ли принцип работы с очередями сообщений в Kafka
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Работа с очередями сообщений в Apache Kafka
Что такое Kafka
Apache Kafka — это распределённая система для потоковой обработки сообщений. Она спроектирована для:
- Высокой пропускной способности (миллионы сообщений в секунду)
- Надёжности (гарантия доставки)
- Масштабируемости (горизонтальное расширение)
Основные концепции
1. Topic (Тема)
Topic — это категория или канал, в который публикуются сообщения:
Topic: "orders"
├── Partition 0: [msg1] [msg2] [msg3]
├── Partition 1: [msg4] [msg5] [msg6]
└── Partition 2: [msg7] [msg8] [msg9]
Каждая тема разбита на partitions для параллельной обработки.
2. Partition (Раздел)
Partition — это упорядоченная последовательность сообщений:
Partition 0:
Offset 0: {"id": 1, "amount": 100}
Offset 1: {"id": 2, "amount": 200}
Offset 2: {"id": 3, "amount": 300}
Каждое сообщение имеет offset (позицию в partition).
3. Producer (Издатель)
Producer пишет сообщения в topic:
KafkaProducer<String, String> producer =
new KafkaProducer<>(properties);
ProducerRecord<String, String> record =
new ProducerRecord<>("orders", "order-123", "{\"amount\": 100}");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to partition " +
metadata.partition() +
" at offset " +
metadata.offset());
} else {
exception.printStackTrace();
}
});
4. Consumer (Подписчик)
Consumer читает сообщения из topic:
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
// Обработка сообщения
processOrder(record.value());
}
// Коммит offset'а
consumer.commitSync();
}
Архитектура Kafka
┌─────────────┐
│ Brokers │ (кластер серверов Kafka)
├─────────────┤
│ Broker 1 │ Хранит копии partitions
│ Broker 2 │ Распределённое хранилище
│ Broker 3 │ Репликация для надёжности
└─────────────┘
↑ ↓
┌──────────────┐ ┌──────────────┐
│ Producer │ │ Consumer │
│ (отправляет)│ │ (получает) │
└──────────────┘ └──────────────┘
Процесс отправки сообщения
// Шаг 1: Создать Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer =
new KafkaProducer<>(props);
// Шаг 2: Отправить сообщение
ProducerRecord<String, String> record =
new ProducerRecord<>(
"orders", // Topic
"order-001", // Key (для партиционирования)
"{amount: 100}" // Value (сообщение)
);
// Шаг 3: Асинхронная отправка
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Ошибка отправки
log.error("Failed to send", exception);
} else {
// Успех
log.info("Sent: partition=" + metadata.partition() +
", offset=" + metadata.offset());
}
});
// Шаг 4: Закрыть Producer
producer.close();
Гарантии доставки
1. At Most Once (максимум один раз)
props.put("acks", "0"); // Producer не ждёт подтверждения
// Быстро, но может потеряться
2. At Least Once (минимум один раз)
props.put("acks", "1"); // Leader подтверждает
// Может быть дублирование
3. Exactly Once (ровно один раз)
props.put("acks", "all"); // Все replicas подтверждают
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", "1");
// Самое надёжное, медленнее
Consumer Groups (Группы потребителей)
Одна группа может читать из topic параллельно:
Topic: "orders" (3 partitions)
├── Partition 0 ────→ Consumer 1
├── Partition 1 ────→ Consumer 2
└── Partition 2 ────→ Consumer 3
Consumer Group: "order-processors"
Java код:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processors");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
// Каждый consumer автоматически получит часть partitions
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitAsync(); // Асинхронный коммит
}
Управление Offset'ами
Offset — это позиция, с которой Consumer продолжит читать:
// Автоматический коммит
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// Ручной коммит (более контролируемо)
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record.value());
consumer.commitSync(); // Коммит только после успеха
} catch (Exception e) {
// Ошибка - коммит не выполнен
// Consumer переустановится и переобработает
log.error("Error processing", e);
}
}
}
Практический пример: обработка заказов
// Producer: когда заказ создан
public class OrderService {
private KafkaProducer<String, String> producer;
public void createOrder(Order order) {
// Сохраняем в БД
orderRepository.save(order);
// Публикуем событие в Kafka
ProducerRecord<String, String> record =
new ProducerRecord<>(
"order-events",
order.getId(),
objectMapper.writeValueAsString(
new OrderCreatedEvent(order.getId(), order.getAmount())
)
);
producer.send(record);
}
}
// Consumer: обработка платежей
public class PaymentProcessor {
public void start() {
consumer.subscribe(Arrays.asList("order-events"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
OrderCreatedEvent event =
objectMapper.readValue(record.value(), OrderCreatedEvent.class);
// Обработать платёж
processPayment(event.getOrderId(), event.getAmount());
consumer.commitSync();
}
}
}
}
Реальные сценарии использования
- Event Streaming — обработка событий в реальном времени
- Log Aggregation — сбор логов из разных сервисов
- Stream Processing — обработка потоков данных (с Flink/Spark)
- Микросервисная архитектура — асинхронная коммуникация
- Real-time Analytics — аналитика на лету
Преимущества Kafka
✅ Высокая пропускная способность ✅ Надёжность (репликация) ✅ Масштабируемость ✅ Долгое хранение сообщений ✅ Asynchronous обработка
Когда НЕ использовать Kafka
❌ Нужна гарантия порядка между разными key'ами ❌ Требуется синхронный RPC ❌ Малая нагрузка (RabbitMQ проще) ❌ Нужны сложные селекторы сообщений
Заключение
Kafka — это мощный инструмент для построения асинхронных, масштабируемых систем. Основные концепции:
- Topics → категории сообщений
- Partitions → распараллеливание
- Producers → отправляют
- Consumers → получают
- Consumer Groups → параллельная обработка
- Offsets → отслеживание позиции
Она критична для modern backend систем!