← Назад к вопросам

Как гарантировать, чтобы несколько консьюмеров не обрабатывали несколько событий от одной сущности одновременно

2.0 Middle🔥 131 комментариев
#Основы Java

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Предотвращение одновременной обработки одного события

Это классическая проблема в распределённых системах: несколько консьюмеров получают одно событие и начинают его обрабатывать параллельно. Есть несколько решений.

1. Kafka Partitioning (Best Practice)

Используем partition key на основе сущности:

// Producer — отправляем с ключом
@Service
public class OrderEventPublisher {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    public void publishOrderCreated(Order order) {
        // КЛЮЧ = orderId, это гарантирует, что события одного заказа
        // идут в одну partition и обрабатываются последовательно
        kafkaTemplate.send(
            new ProducerRecord<>(
                "order-events",
                order.getId(),  // <- Partition key
                order.getId(),  // <- Message key
                new OrderCreatedEvent(order)
            )
        );
    }
}

Как это работает

Kafka Topic: order-events (5 partitions)

Partition 0: [Event(order_123), Event(order_123)]
Partition 1: [Event(order_456)]
Partition 2: [Event(order_789)]
Partition 3: []
Partition 4: [Event(order_123)]

Consumer Group:
- Consumer-1 читает Partition 0 (ТОЛЬКО события order_123)
- Consumer-2 читает Partition 1 (order_456)
- Consumer-3 читает Partition 2 (order_789)

Гарантия: события одной сущности (order_123) идут в одну partition, one consumer их обрабатывает, порядок сохраняется.

// Consumer конфиг
@Configuration
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        
        // concurrency = количество потоков на partition
        factory.setConcurrency(1); // <- ВАЖНО: 1 consumer на partition
        
        return factory;
    }
}

// Listener
@Service
public class OrderEventListener {
    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void handleOrderEvent(OrderEvent event) {
        // Гарантия: только один consumer обрабатывает события
        // одного orderId одновременно
        orderService.process(event);
    }
}

2. Distributed Lock (для синхронной обработки)

Если Kafka partitioning недостаточно, используем RedLock:

@Service
@RequiredArgsConstructor
public class OrderProcessingService {
    private final RedisTemplate<String, String> redisTemplate;
    private final OrderRepository orderRepository;
    
    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(OrderEvent event) {
        String lockKey = "order:" + event.getOrderId() + ":lock";
        String lockValue = UUID.randomUUID().toString();
        
        // Пытаемся получить lock
        Boolean lockAcquired = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, Duration.ofSeconds(30));
        
        if (!lockAcquired) {
            // Lock занят, перепубликуем событие в Kafka
            rePublishEvent(event);
            return;
        }
        
        try {
            // Обрабатываем при наличии lock
            Order order = orderRepository.findById(event.getOrderId())
                .orElseThrow();
            
            order.process(event);
            orderRepository.save(order);
        } finally {
            // Освобождаем lock
            String currentValue = redisTemplate.opsForValue()
                .get(lockKey);
            
            if (lockValue.equals(currentValue)) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}

3. Распределённая блокировка через БД

Для более надёжной реализации:

@Service
public class OrderProcessingWithDbLock {
    private final JdbcTemplate jdbcTemplate;
    private final OrderRepository orderRepository;
    
    @KafkaListener(topics = "order-events")
    @Transactional
    public void handleOrderEvent(OrderEvent event) {
        // SELECT FOR UPDATE в PostgreSQL
        Order order = orderRepository.findByIdForUpdate(event.getOrderId())
            .orElseThrow();
        
        // Эта строка заблокирована для других транзакций
        // Другие консьюмеры будут ждать
        
        order.processEvent(event);
        orderRepository.save(order);
    }
}

// Repository
@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
    @Query("SELECT o FROM Order o WHERE o.id = :id FOR UPDATE")
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Order> findByIdForUpdate(@Param("id") String id);
}

4. Pessimistic Locking (Spring Data)

@Entity
@Table(name = "orders")
public class Order {
    @Id
    private String id;
    
    private String status;
    
    @Version  // <- Оптимистичная блокировка
    private Integer version;
}

@Repository
public interface OrderRepository extends JpaRepository<Order, String> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)  // <- Пессимистичная
    @Query("SELECT o FROM Order o WHERE o.id = :id")
    Optional<Order> findByIdLocked(@Param("id") String id);
}

5. Idempotency + Deduplication

Позволяем параллельную обработку, но гарантируем идемпотентность:

@Service
public class IdempotentOrderService {
    private final OrderRepository orderRepository;
    private final ProcessedEventRepository processedEventRepository;
    
    @KafkaListener(topics = "order-events")
    @Transactional
    public void handleOrderEvent(OrderEvent event) {
        // 1. Проверяем, был ли уже обработан этот event
        if (processedEventRepository.existsById(event.getEventId())) {
            log.info("Event {} already processed", event.getEventId());
            return;
        }
        
        // 2. Обрабатываем
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow();
        order.processEvent(event);
        orderRepository.save(order);
        
        // 3. Записываем, что обработали
        ProcessedEvent processed = new ProcessedEvent(event.getEventId());
        processedEventRepository.save(processed);
    }
}

Сравнение подходов

ПодходСложностьPerformanceReliabilityИспользую когда
Partitioning✓ низкая✓✓✓ высокая✓✓ хорошаяВСЕ случаи
RedLock✓✓ средняя✓✓ средняя✓✓✓ отличнаяEdge cases
DB Lock✓✓ средняя✓ низкая✓✓✓ отличнаяCritical sections
Idempotency✓✓ средняя✓✓✓ высокая✓✓ хорошаяДопустима переобработка

Реальный пример (комбинированный подход)

@Service
@RequiredArgsConstructor
public class OrderEventHandler {
    private final OrderRepository orderRepository;
    private final ProcessedEventRepository processedEventRepository;
    private final RedisTemplate<String, String> redisTemplate;
    
    @KafkaListener(
        topics = "order-events",
        containerFactory = "kafkaListenerContainerFactory"
    )
    @Transactional
    public void handleOrderEvent(OrderEvent event) {
        // Уровень 1: Проверка идемпотентности
        if (processedEventRepository.existsById(event.getEventId())) {
            return;
        }
        
        // Уровень 2: Получаем lock на заказ
        String lockKey = "order:" + event.getOrderId();
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, "locked", Duration.ofSeconds(10));
        
        if (!locked) {
            throw new OrderLockedException();
        }
        
        try {
            // Уровень 3: DB уровень — pessimistic lock
            Order order = orderRepository.findByIdLocked(event.getOrderId())
                .orElseThrow();
            
            // Обрабатываем
            order.processEvent(event);
            orderRepository.save(order);
            
            // Записываем обработку
            processedEventRepository.save(
                new ProcessedEvent(event.getEventId())
            );
        } finally {
            redisTemplate.delete(lockKey);
        }
    }
}

Итог

Рекомендуемый подход:

  1. Всегда используй partition key в Kafka (по ID сущности)
  2. Для критичной логики добавь идемпотентность
  3. Для edge cases добавь distributed lock (Redis или DB)
  4. Мониторь lag в consumer group

Партиционирование + идемпотентность = 99% всех случаев.