← Назад к вопросам

Расскажи о интересной задаче, которую приходилось делать

1.3 Junior🔥 231 комментариев
#Soft Skills и карьера

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Интересная задача: Оптимизация высоконагруженной системы обработки событий

Контекст

Несколько лет назад я работал над микросервисом, обрабатывающим миллионы событий в день для платформы аналитики. Задача была актуальная и требовала глубокого понимания конкурентности, производительности и системного дизайна.

Проблема

Система обработки событий работала следующим образом:

  • Клиенты отправляют события через REST API
  • События сохраняются в PostgreSQL
  • Асинхронный воркер читает события из БД с помощью polling каждую секунду
  • Обрабатывает события в одном потоке

Симптомы проблемы:

  1. При пиковых нагрузках (100K событий/сек) очередь растёт экспоненциально
  2. Latency обработки события — от секунд до минут
  3. БД становилась узким местом (слишком много SELECT запросов)
  4. Потребление памяти неконтролируемо растёт

Анализ

Проблемы, которые я выявил:

  1. Polling неэффективен — постоянные пустые SELECT запросы
  2. Однопоточная обработка — неиспользование многоядерности
  3. Отсутствие backpressure — система пытается обработать всё сразу
  4. Неправильный batch размер — либо слишком мало, либо слишком много событий за раз
  5. Синхронная обработка в БД — долгие транзакции

Решение

1. Миграция с polling на LISTEN/NOTIFY

// До: polling каждую секунду
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
    List<Event> events = eventRepository.findUnprocessed(100);
    events.forEach(this::processEvent);
}, 1, 1, TimeUnit.SECONDS);

// После: event-driven с PostgreSQL LISTEN/NOTIFY
public class PostgresNotificationListener {
    private Connection listenerConnection;
    
    public void startListening() throws SQLException {
        listenerConnection = dataSource.getConnection();
        Statement statement = listenerConnection.createStatement();
        statement.execute("LISTEN event_created");
        
        // Асинхронный loop для слушания уведомлений
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                org.postgresql.PGNotification[] notifications = 
                    ((org.postgresql.jdbc.PgConnection) listenerConnection)
                        .getNotifications(5000); // Timeout 5 сек
                
                if (notifications != null) {
                    for (org.postgresql.PGNotification notification : notifications) {
                        eventQueue.add(notification.getParameter());
                    }
                }
            }
        }).start();
    }
}

// На уровне БД: тригер для отправки уведомления
// CREATE OR REPLACE FUNCTION notify_event()
// BEGIN
//     PERFORM pg_notify(event_created, row_to_json(NEW)::text);
//     RETURN NEW;
// END;

2. Многопоточная обработка с thread pool

public class EventProcessor {
    private final ExecutorService executorService = 
        Executors.newFixedThreadPool(16); // На 16 ядер
    private final BlockingQueue<Event> eventQueue = 
        new LinkedBlockingQueue<>(10000); // Bounded queue для backpressure
    
    public void startProcessing() {
        for (int i = 0; i < 16; i++) {
            executorService.submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Event event = eventQueue.poll(1, TimeUnit.SECONDS);
                        if (event != null) {
                            processEvent(event);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
    }
    
    private void processEvent(Event event) {
        try {
            // Бизнес-логика
            enrichEvent(event);
            validateEvent(event);
            storageService.save(event);
            
            markAsProcessed(event.getId());
        } catch (Exception e) {
            // Retry logic с exponential backoff
            handleError(event, e);
        }
    }
}

3. Batch обработка и асинхронные oper

public class BatchEventProcessor {
    private final int BATCH_SIZE = 500;
    private final BlockingQueue<Event> eventQueue;
    
    public void processBatch() throws InterruptedException {
        List<Event> batch = new ArrayList<>(BATCH_SIZE);
        
        while (eventQueue.drainTo(batch, BATCH_SIZE) > 0 || !batch.isEmpty()) {
            // Batch INSERT вместо одиночных запросов
            batchSave(batch);
            
            // Асинхронно отправляем события в другие сервисы
            sendToDownstreamServices(batch);
            
            batch.clear();
        }
    }
    
    private void batchSave(List<Event> events) {
        // Один запрос вместо N запросов
        eventRepository.saveAll(events);
    }
}

4. Мониторинг и метрики

public class EventProcessingMetrics {
    private final MeterRegistry meterRegistry;
    private final AtomicInteger queueSize;
    private final Timer processingTimer;
    
    public EventProcessingMetrics(MeterRegistry registry) {
        this.meterRegistry = registry;
        this.queueSize = registry.gauge("event.queue.size", new AtomicInteger(0));
        this.processingTimer = Timer.builder("event.processing.time")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(registry);
    }
    
    public void recordProcessing(Event event, long duration) {
        processingTimer.record(duration, TimeUnit.MILLISECONDS);
        meterRegistry.counter("event.processed", 
            "type", event.getType(),
            "status", "success").increment();
    }
}

Результаты

После внедрения решения:

  • Throughput: 10K → 500K событий/сек на одном сервере
  • Latency: средняя задержка с 30 сек → 100 мс
  • P99 latency: 2 минуты → 500 мс
  • Потребление памяти: стабилизировалось благодаря bounded queue
  • DB Load: на 80% меньше запросов благодаря batch обработке и LISTEN/NOTIFY

Ключевые уроки

  1. Профилирование важно — используй JMH, async-profiler
  2. Backpressure спасает системы — bounded queues предотвращают OutOfMemoryError
  3. Batch обработка существенно быстрее — 500 событий за раз vs 500 отдельных запросов
  4. Event-driven лучше, чем polling — для высоконагруженных систем критично
  5. Мониторинг с первого дня — Prometheus/Grafana помогли выявить bottleneck
  6. Многопоточность требует осторожности — правильное использование synchronized/atomic/concurrent коллекций

Эта задача научила меня балансировать между производительностью, надёжностью и простотой кода.

Расскажи о интересной задаче, которую приходилось делать | PrepBro