← Назад к вопросам
Расскажи о интересной задаче, которую приходилось делать
1.3 Junior🔥 231 комментариев
#Soft Skills и карьера
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Интересная задача: Оптимизация высоконагруженной системы обработки событий
Контекст
Несколько лет назад я работал над микросервисом, обрабатывающим миллионы событий в день для платформы аналитики. Задача была актуальная и требовала глубокого понимания конкурентности, производительности и системного дизайна.
Проблема
Система обработки событий работала следующим образом:
- Клиенты отправляют события через REST API
- События сохраняются в PostgreSQL
- Асинхронный воркер читает события из БД с помощью polling каждую секунду
- Обрабатывает события в одном потоке
Симптомы проблемы:
- При пиковых нагрузках (100K событий/сек) очередь растёт экспоненциально
- Latency обработки события — от секунд до минут
- БД становилась узким местом (слишком много SELECT запросов)
- Потребление памяти неконтролируемо растёт
Анализ
Проблемы, которые я выявил:
- Polling неэффективен — постоянные пустые SELECT запросы
- Однопоточная обработка — неиспользование многоядерности
- Отсутствие backpressure — система пытается обработать всё сразу
- Неправильный batch размер — либо слишком мало, либо слишком много событий за раз
- Синхронная обработка в БД — долгие транзакции
Решение
1. Миграция с polling на LISTEN/NOTIFY
// До: polling каждую секунду
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
List<Event> events = eventRepository.findUnprocessed(100);
events.forEach(this::processEvent);
}, 1, 1, TimeUnit.SECONDS);
// После: event-driven с PostgreSQL LISTEN/NOTIFY
public class PostgresNotificationListener {
private Connection listenerConnection;
public void startListening() throws SQLException {
listenerConnection = dataSource.getConnection();
Statement statement = listenerConnection.createStatement();
statement.execute("LISTEN event_created");
// Асинхронный loop для слушания уведомлений
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
org.postgresql.PGNotification[] notifications =
((org.postgresql.jdbc.PgConnection) listenerConnection)
.getNotifications(5000); // Timeout 5 сек
if (notifications != null) {
for (org.postgresql.PGNotification notification : notifications) {
eventQueue.add(notification.getParameter());
}
}
}
}).start();
}
}
// На уровне БД: тригер для отправки уведомления
// CREATE OR REPLACE FUNCTION notify_event()
// BEGIN
// PERFORM pg_notify(event_created, row_to_json(NEW)::text);
// RETURN NEW;
// END;
2. Многопоточная обработка с thread pool
public class EventProcessor {
private final ExecutorService executorService =
Executors.newFixedThreadPool(16); // На 16 ядер
private final BlockingQueue<Event> eventQueue =
new LinkedBlockingQueue<>(10000); // Bounded queue для backpressure
public void startProcessing() {
for (int i = 0; i < 16; i++) {
executorService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Event event = eventQueue.poll(1, TimeUnit.SECONDS);
if (event != null) {
processEvent(event);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
}
private void processEvent(Event event) {
try {
// Бизнес-логика
enrichEvent(event);
validateEvent(event);
storageService.save(event);
markAsProcessed(event.getId());
} catch (Exception e) {
// Retry logic с exponential backoff
handleError(event, e);
}
}
}
3. Batch обработка и асинхронные oper
public class BatchEventProcessor {
private final int BATCH_SIZE = 500;
private final BlockingQueue<Event> eventQueue;
public void processBatch() throws InterruptedException {
List<Event> batch = new ArrayList<>(BATCH_SIZE);
while (eventQueue.drainTo(batch, BATCH_SIZE) > 0 || !batch.isEmpty()) {
// Batch INSERT вместо одиночных запросов
batchSave(batch);
// Асинхронно отправляем события в другие сервисы
sendToDownstreamServices(batch);
batch.clear();
}
}
private void batchSave(List<Event> events) {
// Один запрос вместо N запросов
eventRepository.saveAll(events);
}
}
4. Мониторинг и метрики
public class EventProcessingMetrics {
private final MeterRegistry meterRegistry;
private final AtomicInteger queueSize;
private final Timer processingTimer;
public EventProcessingMetrics(MeterRegistry registry) {
this.meterRegistry = registry;
this.queueSize = registry.gauge("event.queue.size", new AtomicInteger(0));
this.processingTimer = Timer.builder("event.processing.time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
}
public void recordProcessing(Event event, long duration) {
processingTimer.record(duration, TimeUnit.MILLISECONDS);
meterRegistry.counter("event.processed",
"type", event.getType(),
"status", "success").increment();
}
}
Результаты
После внедрения решения:
- Throughput: 10K → 500K событий/сек на одном сервере
- Latency: средняя задержка с 30 сек → 100 мс
- P99 latency: 2 минуты → 500 мс
- Потребление памяти: стабилизировалось благодаря bounded queue
- DB Load: на 80% меньше запросов благодаря batch обработке и LISTEN/NOTIFY
Ключевые уроки
- Профилирование важно — используй JMH, async-profiler
- Backpressure спасает системы — bounded queues предотвращают OutOfMemoryError
- Batch обработка существенно быстрее — 500 событий за раз vs 500 отдельных запросов
- Event-driven лучше, чем polling — для высоконагруженных систем критично
- Мониторинг с первого дня — Prometheus/Grafana помогли выявить bottleneck
- Многопоточность требует осторожности — правильное использование synchronized/atomic/concurrent коллекций
Эта задача научила меня балансировать между производительностью, надёжностью и простотой кода.