← Назад к вопросам
Как спроектировать систему для обработки 100 миллионов сообщений в сутки с обновлением записей и интеграцией с внешней системой
3.0 Senior🔥 161 комментариев
#Docker, Kubernetes и DevOps#REST API и микросервисы#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Архитектура системы для обработки 100M сообщений в сутки
Это сложная задача, требующая комплексного подхода. Разберу проектирование такой системы пошагово.
Анализ требований
Исходные данные:
- 100 млн сообщений в сутки = ~1,157 сообщений/сек (RPS)
- Пиковые нагрузки могут быть в 5-10 раз выше (~10K RPS)
- Требуется обновление записей
- Интеграция с внешней системой (вероятно API)
SLA для проектирования:
- Availability: 99.99% (52 минуты downtime в год)
- P99 latency: < 1000ms
- Data durability: 99.9999%
Архитектурное решение
1. Message Broker Layer (Kafka)
// Конфигурация продюсера
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Message> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-1:9092,kafka-2:9092,kafka-3:9092");
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Дождись всех реплик
configProps.put(ProducerConfig.RETRIES_CONFIG, 10);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
KafkaTemplate<String, Message> template =
new KafkaTemplate<>(producerFactory());
template.setDefaultTopic("events-incoming");
return template;
}
}
// Kafka Topic конфигурация
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic incomingTopic() {
return TopicBuilder.name("events-incoming")
.partitions(100) // 100 партиций для высокого throughput
.replicas(3) // 3 реплики для надёжности
.config("retention.ms", "604800000") // 7 дней
.build();
}
@Bean
public NewTopic processingTopic() {
return TopicBuilder.name("events-processing")
.partitions(100)
.replicas(3)
.build();
}
@Bean
public NewTopic dlqTopic() {
return TopicBuilder.name("events-dlq")
.partitions(10)
.replicas(3)
.build();
}
}
2. Stream Processing Layer
@Service
@Slf4j
public class EventProcessingService {
private final KafkaTemplate<String, Message> kafkaTemplate;
private final EventRepository eventRepository;
private final ExternalSystemClient externalClient;
private final MetricsService metricsService;
// Консьюмер с параллельной обработкой
@KafkaListener(
topics = "events-incoming",
groupId = "event-processing-group",
concurrency = "20" // 20 параллельных потребителей
)
@Transactional
public void processMessage(
Message message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
long startTime = System.nanoTime();
try {
// 1. Валидация
validateMessage(message);
// 2. Дедупликация (важна для идемпотентности)
if (isDuplicate(message.getId())) {
log.info("Duplicate message: {}", message.getId());
return;
}
// 3. Сохранение в БД (транзакция)
Event event = saveEventToDatabase(message);
// 4. Отправка во внешнюю систему (асинхронно)
sendToExternalSystem(event);
// 5. Логирование метрик
metricsService.recordSuccess(partition, offset,
System.nanoTime() - startTime);
} catch (ValidationException e) {
log.error("Validation failed: {}", message.getId());
sendToDLQ(message, "VALIDATION_FAILED");
} catch (Exception e) {
log.error("Processing failed: {}", message.getId(), e);
sendToDLQ(message, e.getMessage());
}
}
private void validateMessage(Message message) {
if (message.getId() == null || message.getPayload() == null) {
throw new ValidationException("Invalid message structure");
}
}
private boolean isDuplicate(String messageId) {
return eventRepository.existsById(messageId);
}
private Event saveEventToDatabase(Message message) {
Event event = new Event();
event.setId(message.getId());
event.setPayload(message.getPayload());
event.setStatus("PENDING");
event.setCreatedAt(LocalDateTime.now(UTC));
return eventRepository.save(event);
}
private void sendToExternalSystem(Event event) {
CompletableFuture.runAsync(() -> {
try {
ExternalResponse response = externalClient.send(event);
updateEventStatus(event.getId(), "SENT", response.getExternalId());
} catch (Exception e) {
log.error("External system error: {}", event.getId(), e);
retryWithExponentialBackoff(event);
}
});
}
private void sendToDLQ(Message message, String reason) {
kafkaTemplate.send("events-dlq", message.getId(),
new ErrorMessage(message, reason, LocalDateTime.now(UTC)));
}
}
3. Database Layer - Оптимизация
@Entity
@Table(name = "events", indexes = {
@Index(name = "idx_status", columnList = "status"),
@Index(name = "idx_created_at", columnList = "created_at"),
@Index(name = "idx_external_id", columnList = "external_id")
})
public class Event {
@Id
private String id;
@Column(columnDefinition = "jsonb")
private String payload; // JSONB для оптимизации
@Column(length = 50)
private String status; // PENDING, SENT, COMPLETED, FAILED
private String externalId;
@CreationTimestamp
private LocalDateTime createdAt;
@UpdateTimestamp
private LocalDateTime updatedAt;
}
//批 обновления для оптимизации
@Repository
public interface EventRepository extends JpaRepository<Event, String> {
@Modifying
@Query("UPDATE Event e SET e.status = :status WHERE e.id IN :ids")
int updateStatusBatch(@Param("ids") List<String> ids,
@Param("status") String status);
// Для изоляции на чтение
@Query(value =
"SELECT * FROM events WHERE status = :status LIMIT :limit",
nativeQuery = true)
List<Event> findByStatusWithLimit(
@Param("status") String status,
@Param("limit") int limit);
}
@Service
public class BatchUpdateService {
private final EventRepository eventRepository;
private final static int BATCH_SIZE = 10000;
@Scheduled(fixedRate = 5000) // Каждые 5 секунд
public void processBatchUpdates() {
List<Event> pending = eventRepository
.findByStatusWithLimit("PENDING_UPDATE", BATCH_SIZE);
List<String> ids = pending.stream()
.map(Event::getId)
.collect(Collectors.toList());
eventRepository.updateStatusBatch(ids, "COMPLETED");
}
}
4. External System Integration
@Service
@Slf4j
public class ExternalSystemClient {
private final RestTemplate restTemplate;
private final CircuitBreaker circuitBreaker;
private final RetryTemplate retryTemplate;
@Retryable(
value = {IOException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2.0)
)
@CircuitBreaker(name = "externalAPI")
public ExternalResponse send(Event event) {
try {
long startTime = System.nanoTime();
ResponseEntity<ExternalResponse> response = restTemplate.postForEntity(
"https://external-api.com/events",
new ExternalRequest(event),
ExternalResponse.class
);
long duration = System.nanoTime() - startTime;
log.info("External API call took: {}ms", duration / 1_000_000);
return response.getBody();
} catch (HttpClientErrorException.Conflict e) {
log.warn("Conflict with external system: {}", e.getMessage());
throw new RecoverableException("Conflict", e);
} catch (Exception e) {
log.error("External system error", e);
throw new NonRecoverableException("Failed", e);
}
}
}
// Резилиентность
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreaker circuitBreaker() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50.0f)
.waitDurationInOpenState(Duration.ofSeconds(30))
.build();
return CircuitBreaker.of("externalAPI", config);
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy =
new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMaxInterval(10000);
backOffPolicy.setMultiplier(2.0);
template.setBackOffPolicy(backOffPolicy);
template.setRetryPolicy(new SimpleRetryPolicy(3));
return template;
}
}
5. Caching Layer
@Configuration
public class CacheConfig {
@Bean
public CacheManager cacheManager() {
return new RedisCacheManager.create(
LettuceConnectionFactory.create()
);
}
}
@Service
public class CachedEventService {
private final EventRepository eventRepository;
private final CacheManager cacheManager;
@Cacheable(value = "events", key = "#id", unless = "#result == null")
public Event getEventById(String id) {
return eventRepository.findById(id).orElse(null);
}
@CacheEvict(value = "events", key = "#event.id")
public Event updateEvent(Event event) {
return eventRepository.save(event);
}
}
Инфраструктура и Масштабирование
Docker Compose для локального тестирования:
version: 3.8
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_NUM_PARTITIONS: 100
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
ports:
- "9092:9092"
postgres:
image: postgres:15
environment:
POSTGRES_DB: events
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
kafka-data:
postgres-data:
Kubernetes развёртывание:
apiVersion: apps/v1
kind: Deployment
metadata:
name: event-processor
spec:
replicas: 10 # Горизонтальное масштабирование
template:
spec:
containers:
- name: processor
image: event-processor:latest
resources:
requests:
cpu: "2"
memory: "2Gi"
limits:
cpu: "4"
memory: "4Gi"
env:
- name: KAFKA_CONCURRENCY
value: "20"
- name: BATCH_SIZE
value: "10000"
Мониторинг и Алерты
@Service
@Slf4j
public class MetricsService {
private final MeterRegistry meterRegistry;
public void recordSuccess(int partition, long offset, long durationNanos) {
Counter.builder("events.processed")
.tag("partition", String.valueOf(partition))
.register(meterRegistry)
.increment();
Timer.builder("events.processing.duration")
.register(meterRegistry)
.record(Duration.ofNanos(durationNanos));
}
public void recordError(String reason) {
Counter.builder("events.failed")
.tag("reason", reason)
.register(meterRegistry)
.increment();
}
}
// Prometheus metrics
// - events_processed_total (counter)
// - events_processing_duration_seconds (histogram)
// - events_failed_total (counter)
// - kafka_consumer_lag (gauge)
Ключевые оптимизации
- Throughput: Batch processing, compression, partitioning
- Latency: In-memory cache, connection pooling
- Reliability: Replication, DLQ, retries, circuit breaker
- Durability: PostgreSQL ACID, Kafka replication
- Scalability: Horizontal scaling, load balancing
Итог: Такая архитектура способна обработать 100M сообщений в сутки с гарантией delivery, обновлением записей и интеграцией с внешними системами при 99.99% uptime.