← Назад к вопросам
Как гарантировать, чтобы несколько консьюмеров не обрабатывали несколько событий от одной сущности одновременно
2.0 Middle🔥 131 комментариев
#Основы Java
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Предотвращение одновременной обработки одного события
Это классическая проблема в распределённых системах: несколько консьюмеров получают одно событие и начинают его обрабатывать параллельно. Есть несколько решений.
1. Kafka Partitioning (Best Practice)
Используем partition key на основе сущности:
// Producer — отправляем с ключом
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publishOrderCreated(Order order) {
// КЛЮЧ = orderId, это гарантирует, что события одного заказа
// идут в одну partition и обрабатываются последовательно
kafkaTemplate.send(
new ProducerRecord<>(
"order-events",
order.getId(), // <- Partition key
order.getId(), // <- Message key
new OrderCreatedEvent(order)
)
);
}
}
Как это работает
Kafka Topic: order-events (5 partitions)
Partition 0: [Event(order_123), Event(order_123)]
Partition 1: [Event(order_456)]
Partition 2: [Event(order_789)]
Partition 3: []
Partition 4: [Event(order_123)]
Consumer Group:
- Consumer-1 читает Partition 0 (ТОЛЬКО события order_123)
- Consumer-2 читает Partition 1 (order_456)
- Consumer-3 читает Partition 2 (order_789)
Гарантия: события одной сущности (order_123) идут в одну partition, one consumer их обрабатывает, порядок сохраняется.
// Consumer конфиг
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// concurrency = количество потоков на partition
factory.setConcurrency(1); // <- ВАЖНО: 1 consumer на partition
return factory;
}
}
// Listener
@Service
public class OrderEventListener {
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(OrderEvent event) {
// Гарантия: только один consumer обрабатывает события
// одного orderId одновременно
orderService.process(event);
}
}
2. Distributed Lock (для синхронной обработки)
Если Kafka partitioning недостаточно, используем RedLock:
@Service
@RequiredArgsConstructor
public class OrderProcessingService {
private final RedisTemplate<String, String> redisTemplate;
private final OrderRepository orderRepository;
@KafkaListener(topics = "order-events")
public void handleOrderEvent(OrderEvent event) {
String lockKey = "order:" + event.getOrderId() + ":lock";
String lockValue = UUID.randomUUID().toString();
// Пытаемся получить lock
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, Duration.ofSeconds(30));
if (!lockAcquired) {
// Lock занят, перепубликуем событие в Kafka
rePublishEvent(event);
return;
}
try {
// Обрабатываем при наличии lock
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow();
order.process(event);
orderRepository.save(order);
} finally {
// Освобождаем lock
String currentValue = redisTemplate.opsForValue()
.get(lockKey);
if (lockValue.equals(currentValue)) {
redisTemplate.delete(lockKey);
}
}
}
}
3. Распределённая блокировка через БД
Для более надёжной реализации:
@Service
public class OrderProcessingWithDbLock {
private final JdbcTemplate jdbcTemplate;
private final OrderRepository orderRepository;
@KafkaListener(topics = "order-events")
@Transactional
public void handleOrderEvent(OrderEvent event) {
// SELECT FOR UPDATE в PostgreSQL
Order order = orderRepository.findByIdForUpdate(event.getOrderId())
.orElseThrow();
// Эта строка заблокирована для других транзакций
// Другие консьюмеры будут ждать
order.processEvent(event);
orderRepository.save(order);
}
}
// Repository
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
@Query("SELECT o FROM Order o WHERE o.id = :id FOR UPDATE")
@Lock(LockModeType.PESSIMISTIC_WRITE)
Optional<Order> findByIdForUpdate(@Param("id") String id);
}
4. Pessimistic Locking (Spring Data)
@Entity
@Table(name = "orders")
public class Order {
@Id
private String id;
private String status;
@Version // <- Оптимистичная блокировка
private Integer version;
}
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
@Lock(LockModeType.PESSIMISTIC_WRITE) // <- Пессимистичная
@Query("SELECT o FROM Order o WHERE o.id = :id")
Optional<Order> findByIdLocked(@Param("id") String id);
}
5. Idempotency + Deduplication
Позволяем параллельную обработку, но гарантируем идемпотентность:
@Service
public class IdempotentOrderService {
private final OrderRepository orderRepository;
private final ProcessedEventRepository processedEventRepository;
@KafkaListener(topics = "order-events")
@Transactional
public void handleOrderEvent(OrderEvent event) {
// 1. Проверяем, был ли уже обработан этот event
if (processedEventRepository.existsById(event.getEventId())) {
log.info("Event {} already processed", event.getEventId());
return;
}
// 2. Обрабатываем
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow();
order.processEvent(event);
orderRepository.save(order);
// 3. Записываем, что обработали
ProcessedEvent processed = new ProcessedEvent(event.getEventId());
processedEventRepository.save(processed);
}
}
Сравнение подходов
| Подход | Сложность | Performance | Reliability | Использую когда |
|---|---|---|---|---|
| Partitioning | ✓ низкая | ✓✓✓ высокая | ✓✓ хорошая | ВСЕ случаи |
| RedLock | ✓✓ средняя | ✓✓ средняя | ✓✓✓ отличная | Edge cases |
| DB Lock | ✓✓ средняя | ✓ низкая | ✓✓✓ отличная | Critical sections |
| Idempotency | ✓✓ средняя | ✓✓✓ высокая | ✓✓ хорошая | Допустима переобработка |
Реальный пример (комбинированный подход)
@Service
@RequiredArgsConstructor
public class OrderEventHandler {
private final OrderRepository orderRepository;
private final ProcessedEventRepository processedEventRepository;
private final RedisTemplate<String, String> redisTemplate;
@KafkaListener(
topics = "order-events",
containerFactory = "kafkaListenerContainerFactory"
)
@Transactional
public void handleOrderEvent(OrderEvent event) {
// Уровень 1: Проверка идемпотентности
if (processedEventRepository.existsById(event.getEventId())) {
return;
}
// Уровень 2: Получаем lock на заказ
String lockKey = "order:" + event.getOrderId();
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "locked", Duration.ofSeconds(10));
if (!locked) {
throw new OrderLockedException();
}
try {
// Уровень 3: DB уровень — pessimistic lock
Order order = orderRepository.findByIdLocked(event.getOrderId())
.orElseThrow();
// Обрабатываем
order.processEvent(event);
orderRepository.save(order);
// Записываем обработку
processedEventRepository.save(
new ProcessedEvent(event.getEventId())
);
} finally {
redisTemplate.delete(lockKey);
}
}
}
Итог
Рекомендуемый подход:
- Всегда используй partition key в Kafka (по ID сущности)
- Для критичной логики добавь идемпотентность
- Для edge cases добавь distributed lock (Redis или DB)
- Мониторь lag в consumer group
Партиционирование + идемпотентность = 99% всех случаев.