← Назад к вопросам

Как спроектировать систему для обработки 100 миллионов сообщений в сутки с обновлением записей и интеграцией с внешней системой

3.0 Senior🔥 161 комментариев
#Docker, Kubernetes и DevOps#REST API и микросервисы#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Архитектура системы для обработки 100M сообщений в сутки

Это сложная задача, требующая комплексного подхода. Разберу проектирование такой системы пошагово.

Анализ требований

Исходные данные:

  • 100 млн сообщений в сутки = ~1,157 сообщений/сек (RPS)
  • Пиковые нагрузки могут быть в 5-10 раз выше (~10K RPS)
  • Требуется обновление записей
  • Интеграция с внешней системой (вероятно API)

SLA для проектирования:

  • Availability: 99.99% (52 минуты downtime в год)
  • P99 latency: < 1000ms
  • Data durability: 99.9999%

Архитектурное решение

1. Message Broker Layer (Kafka)

// Конфигурация продюсера
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
            "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");  // Дождись всех реплик
        configProps.put(ProducerConfig.RETRIES_CONFIG, 10);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
            JsonSerializer.class);
        return new DefaultProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        KafkaTemplate<String, Message> template = 
            new KafkaTemplate<>(producerFactory());
        template.setDefaultTopic("events-incoming");
        return template;
    }
}

// Kafka Topic конфигурация
@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic incomingTopic() {
        return TopicBuilder.name("events-incoming")
            .partitions(100)  // 100 партиций для высокого throughput
            .replicas(3)      // 3 реплики для надёжности
            .config("retention.ms", "604800000")  // 7 дней
            .build();
    }
    
    @Bean
    public NewTopic processingTopic() {
        return TopicBuilder.name("events-processing")
            .partitions(100)
            .replicas(3)
            .build();
    }
    
    @Bean
    public NewTopic dlqTopic() {
        return TopicBuilder.name("events-dlq")
            .partitions(10)
            .replicas(3)
            .build();
    }
}

2. Stream Processing Layer

@Service
@Slf4j
public class EventProcessingService {
    private final KafkaTemplate<String, Message> kafkaTemplate;
    private final EventRepository eventRepository;
    private final ExternalSystemClient externalClient;
    private final MetricsService metricsService;
    
    // Консьюмер с параллельной обработкой
    @KafkaListener(
        topics = "events-incoming",
        groupId = "event-processing-group",
        concurrency = "20"  // 20 параллельных потребителей
    )
    @Transactional
    public void processMessage(
        Message message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) long offset) {
        
        long startTime = System.nanoTime();
        
        try {
            // 1. Валидация
            validateMessage(message);
            
            // 2. Дедупликация (важна для идемпотентности)
            if (isDuplicate(message.getId())) {
                log.info("Duplicate message: {}", message.getId());
                return;
            }
            
            // 3. Сохранение в БД (транзакция)
            Event event = saveEventToDatabase(message);
            
            // 4. Отправка во внешнюю систему (асинхронно)
            sendToExternalSystem(event);
            
            // 5. Логирование метрик
            metricsService.recordSuccess(partition, offset, 
                System.nanoTime() - startTime);
            
        } catch (ValidationException e) {
            log.error("Validation failed: {}", message.getId());
            sendToDLQ(message, "VALIDATION_FAILED");
        } catch (Exception e) {
            log.error("Processing failed: {}", message.getId(), e);
            sendToDLQ(message, e.getMessage());
        }
    }
    
    private void validateMessage(Message message) {
        if (message.getId() == null || message.getPayload() == null) {
            throw new ValidationException("Invalid message structure");
        }
    }
    
    private boolean isDuplicate(String messageId) {
        return eventRepository.existsById(messageId);
    }
    
    private Event saveEventToDatabase(Message message) {
        Event event = new Event();
        event.setId(message.getId());
        event.setPayload(message.getPayload());
        event.setStatus("PENDING");
        event.setCreatedAt(LocalDateTime.now(UTC));
        return eventRepository.save(event);
    }
    
    private void sendToExternalSystem(Event event) {
        CompletableFuture.runAsync(() -> {
            try {
                ExternalResponse response = externalClient.send(event);
                updateEventStatus(event.getId(), "SENT", response.getExternalId());
            } catch (Exception e) {
                log.error("External system error: {}", event.getId(), e);
                retryWithExponentialBackoff(event);
            }
        });
    }
    
    private void sendToDLQ(Message message, String reason) {
        kafkaTemplate.send("events-dlq", message.getId(), 
            new ErrorMessage(message, reason, LocalDateTime.now(UTC)));
    }
}

3. Database Layer - Оптимизация

@Entity
@Table(name = "events", indexes = {
    @Index(name = "idx_status", columnList = "status"),
    @Index(name = "idx_created_at", columnList = "created_at"),
    @Index(name = "idx_external_id", columnList = "external_id")
})
public class Event {
    @Id
    private String id;
    
    @Column(columnDefinition = "jsonb")
    private String payload;  // JSONB для оптимизации
    
    @Column(length = 50)
    private String status;   // PENDING, SENT, COMPLETED, FAILED
    
    private String externalId;
    
    @CreationTimestamp
    private LocalDateTime createdAt;
    
    @UpdateTimestamp
    private LocalDateTime updatedAt;
}

//批 обновления для оптимизации
@Repository
public interface EventRepository extends JpaRepository<Event, String> {
    @Modifying
    @Query("UPDATE Event e SET e.status = :status WHERE e.id IN :ids")
    int updateStatusBatch(@Param("ids") List<String> ids, 
                         @Param("status") String status);
    
    // Для изоляции на чтение
    @Query(value = 
        "SELECT * FROM events WHERE status = :status LIMIT :limit",
        nativeQuery = true)
    List<Event> findByStatusWithLimit(
        @Param("status") String status,
        @Param("limit") int limit);
}

@Service
public class BatchUpdateService {
    private final EventRepository eventRepository;
    private final static int BATCH_SIZE = 10000;
    
    @Scheduled(fixedRate = 5000)  // Каждые 5 секунд
    public void processBatchUpdates() {
        List<Event> pending = eventRepository
            .findByStatusWithLimit("PENDING_UPDATE", BATCH_SIZE);
        
        List<String> ids = pending.stream()
            .map(Event::getId)
            .collect(Collectors.toList());
        
        eventRepository.updateStatusBatch(ids, "COMPLETED");
    }
}

4. External System Integration

@Service
@Slf4j
public class ExternalSystemClient {
    private final RestTemplate restTemplate;
    private final CircuitBreaker circuitBreaker;
    private final RetryTemplate retryTemplate;
    
    @Retryable(
        value = {IOException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2.0)
    )
    @CircuitBreaker(name = "externalAPI")
    public ExternalResponse send(Event event) {
        try {
            long startTime = System.nanoTime();
            
            ResponseEntity<ExternalResponse> response = restTemplate.postForEntity(
                "https://external-api.com/events",
                new ExternalRequest(event),
                ExternalResponse.class
            );
            
            long duration = System.nanoTime() - startTime;
            log.info("External API call took: {}ms", duration / 1_000_000);
            
            return response.getBody();
        } catch (HttpClientErrorException.Conflict e) {
            log.warn("Conflict with external system: {}", e.getMessage());
            throw new RecoverableException("Conflict", e);
        } catch (Exception e) {
            log.error("External system error", e);
            throw new NonRecoverableException("Failed", e);
        }
    }
}

// Резилиентность
@Configuration
public class ResilienceConfig {
    @Bean
    public CircuitBreaker circuitBreaker() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50.0f)
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .build();
        return CircuitBreaker.of("externalAPI", config);
    }
    
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        ExponentialBackOffPolicy backOffPolicy = 
            new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMaxInterval(10000);
        backOffPolicy.setMultiplier(2.0);
        template.setBackOffPolicy(backOffPolicy);
        
        template.setRetryPolicy(new SimpleRetryPolicy(3));
        return template;
    }
}

5. Caching Layer

@Configuration
public class CacheConfig {
    @Bean
    public CacheManager cacheManager() {
        return new RedisCacheManager.create(
            LettuceConnectionFactory.create()
        );
    }
}

@Service
public class CachedEventService {
    private final EventRepository eventRepository;
    private final CacheManager cacheManager;
    
    @Cacheable(value = "events", key = "#id", unless = "#result == null")
    public Event getEventById(String id) {
        return eventRepository.findById(id).orElse(null);
    }
    
    @CacheEvict(value = "events", key = "#event.id")
    public Event updateEvent(Event event) {
        return eventRepository.save(event);
    }
}

Инфраструктура и Масштабирование

Docker Compose для локального тестирования:

version: 3.8
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_NUM_PARTITIONS: 100
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
    ports:
      - "9092:9092"
    
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: events
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
  
  redis:
    image: redis:7
    ports:
      - "6379:6379"

volumes:
  kafka-data:
  postgres-data:

Kubernetes развёртывание:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-processor
spec:
  replicas: 10  # Горизонтальное масштабирование
  template:
    spec:
      containers:
      - name: processor
        image: event-processor:latest
        resources:
          requests:
            cpu: "2"
            memory: "2Gi"
          limits:
            cpu: "4"
            memory: "4Gi"
        env:
        - name: KAFKA_CONCURRENCY
          value: "20"
        - name: BATCH_SIZE
          value: "10000"

Мониторинг и Алерты

@Service
@Slf4j
public class MetricsService {
    private final MeterRegistry meterRegistry;
    
    public void recordSuccess(int partition, long offset, long durationNanos) {
        Counter.builder("events.processed")
            .tag("partition", String.valueOf(partition))
            .register(meterRegistry)
            .increment();
        
        Timer.builder("events.processing.duration")
            .register(meterRegistry)
            .record(Duration.ofNanos(durationNanos));
    }
    
    public void recordError(String reason) {
        Counter.builder("events.failed")
            .tag("reason", reason)
            .register(meterRegistry)
            .increment();
    }
}

// Prometheus metrics
// - events_processed_total (counter)
// - events_processing_duration_seconds (histogram)
// - events_failed_total (counter)
// - kafka_consumer_lag (gauge)

Ключевые оптимизации

  1. Throughput: Batch processing, compression, partitioning
  2. Latency: In-memory cache, connection pooling
  3. Reliability: Replication, DLQ, retries, circuit breaker
  4. Durability: PostgreSQL ACID, Kafka replication
  5. Scalability: Horizontal scaling, load balancing

Итог: Такая архитектура способна обработать 100M сообщений в сутки с гарантией delivery, обновлением записей и интеграцией с внешними системами при 99.99% uptime.