← Назад к вопросам

Какие решишь задачу при чтении 20 млн записей из базы данных и отправки в Message Broker

3.0 Senior🔥 151 комментариев
#Базы данных и SQL#Брокеры сообщений

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оптимизация чтения 20млн записей из БД и отправки в Message Broker

Это классическая задача на масштабируемость. 20 млн записей требует специального подхода, иначе приложение упадёт из-за нехватки памяти или timeout-ов. Рассмотрю комплексное решение.

Главные проблемы

  1. Переполнение памяти — загрузить 20млн объектов в RAM невозможно
  2. Timeout от БД — долгое чтение может закрыть соединение
  3. Переполнение Message Broker — нельзя отправить всё сразу
  4. Отсутствие отказоустойчивости — если сбой в середине, всё теряется

Решение 1: Потоковое чтение с батчингом (Stream Processing)

public class DatabaseStreamProcessor {
    private static final int BATCH_SIZE = 1000; // читаем порциями
    private static final int QUEUE_SIZE = 5000;
    
    private final JdbcTemplate jdbcTemplate;
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void processLargeDataset(String topic) {
        // Используем SQL с LIMIT и OFFSET для потокового чтения
        long totalRecords = getTotalRecordCount();
        
        for (long offset = 0; offset < totalRecords; offset += BATCH_SIZE) {
            List<Record> batch = readBatch(offset, BATCH_SIZE);
            
            // Отправляем батч без загрузки всего в памяти
            sendToMessageBroker(batch, topic);
            
            // Логируем прогресс
            long progress = Math.min(offset + BATCH_SIZE, totalRecords);
            System.out.printf("Processed: %d / %d (%.2f%%)\n", 
                progress, totalRecords, (100.0 * progress / totalRecords));
        }
    }
    
    private List<Record> readBatch(long offset, int limit) {
        String sql = "SELECT * FROM large_table ORDER BY id LIMIT ? OFFSET ?";
        return jdbcTemplate.query(sql, 
            new Object[]{limit, offset},
            new RecordRowMapper());
    }
    
    private void sendToMessageBroker(List<Record> batch, String topic) {
        for (Record record : batch) {
            String json = convertToJson(record);
            kafkaTemplate.send(topic, record.getId().toString(), json);
        }
    }
}

Решение 2: Курсоры БД (для PostgreSQL/MySQL)

Курсоры позволяют итерировать по данным без загрузки всего в памяти:

public class CursorBasedProcessor {
    
    public void processWithCursor(DataSource dataSource, String topic) 
            throws SQLException {
        try (Connection conn = dataSource.getConnection()) {
            // Отключаем автокоммит для работы с курсором
            conn.setAutoCommit(false);
            
            // Создаём курсор
            try (Statement stmt = conn.createStatement(
                    ResultSet.TYPE_FORWARD_ONLY,
                    ResultSet.CONCUR_READ_ONLY)) {
                
                // Указываем размер батча для драйвера
                stmt.setFetchSize(Integer.MIN_VALUE); // потоковое чтение
                
                ResultSet rs = stmt.executeQuery(
                    "SELECT id, name, data FROM large_table");
                
                List<Record> batch = new ArrayList<>();
                int batchCount = 0;
                
                while (rs.next()) {
                    Record record = mapRecord(rs);
                    batch.add(record);
                    
                    if (batch.size() >= 1000) {
                        sendBatch(batch, topic);
                        batch.clear();
                        batchCount++;
                        System.out.println("Sent batch: " + batchCount);
                    }
                }
                
                // Последний неполный батч
                if (!batch.isEmpty()) {
                    sendBatch(batch, topic);
                }
            }
        }
    }
}

Решение 3: Spring Data Stream (лучше всего)

@Repository
public interface RecordRepository extends JpaRepository<Record, Long> {
    Stream<Record> findAllByStatusOrderById(String status);
}

@Service
public class LargeDatasetService {
    private final RecordRepository repository;
    private final KafkaTemplate<String, RecordEvent> kafkaTemplate;
    
    @Transactional(readOnly = true)
    public void streamAllRecords(String topic) {
        try (Stream<Record> stream = repository.findAllByStatusOrderById("ACTIVE")) {
            stream
                .peek(r -> System.out.println("Processing: " + r.getId()))
                .map(this::toEvent)
                .collect(Collectors.groupingBy(
                    e -> e.getId() % 10, // партиционируем для параллелизма
                    Collectors.toList()
                ))
                .values()
                .forEach(batch -> sendBatch(batch, topic));
        }
    }
    
    private void sendBatch(List<RecordEvent> batch, String topic) {
        batch.forEach(event -> 
            kafkaTemplate.send(topic, event.getId().toString(), event)
        );
    }
}

Решение 4: Параллельная обработка с ExecutorService

Для ещё большей скорости можно распараллелить чтение:

public class ParallelBatchProcessor {
    private static final int NUM_THREADS = 4;
    private static final int BATCH_SIZE = 1000;
    
    private final JdbcTemplate jdbcTemplate;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ExecutorService executor;
    
    public ParallelBatchProcessor() {
        this.executor = Executors.newFixedThreadPool(NUM_THREADS);
    }
    
    public void processInParallel(String topic) throws InterruptedException {
        long totalRecords = getTotalRecordCount();
        List<Future<?>> futures = new ArrayList<>();
        
        for (long offset = 0; offset < totalRecords; offset += BATCH_SIZE) {
            final long currentOffset = offset;
            
            // Отправляем задачу в пул потоков
            Future<?> future = executor.submit(() -> {
                List<Record> batch = readBatch(currentOffset, BATCH_SIZE);
                sendToMessageBroker(batch, topic);
                System.out.println("Batch at offset " + currentOffset + " processed");
            });
            
            futures.add(future);
        }
        
        // Ждём завершения всех задач
        for (Future<?> future : futures) {
            future.get(); // блокируем до завершения
        }
        
        executor.shutdown();
    }
}

Решение 5: Полная production-ready реализация

@Service
public class MassiveDataExportService {
    private static final int BATCH_SIZE = 5000;
    private static final int QUEUE_SIZE = 10000;
    private static final int MAX_RETRIES = 3;
    
    private final RecordRepository repository;
    private final KafkaTemplate<String, RecordEvent> kafkaTemplate;
    private final ExportProgressService progressService;
    private final Logger logger = LoggerFactory.getLogger(this.class);
    
    @Transactional(readOnly = true)
    public void exportLargeDataset(String exportId, String topic) {
        try {
            progressService.startExport(exportId);
            
            AtomicLong processedCount = new AtomicLong(0);
            AtomicLong errorCount = new AtomicLong(0);
            
            try (Stream<Record> stream = repository.findAllOrderById()) {
                stream
                    .collect(Collectors.groupingByConcurrent(
                        r -> r.getId() % BATCH_SIZE,
                        Collectors.toList()
                    ))
                    .values()
                    .forEach(batch -> {
                        try {
                            sendBatchWithRetry(batch, topic, MAX_RETRIES);
                            processedCount.addAndGet(batch.size());
                            
                            // Обновляем прогресс
                            progressService.updateProgress(
                                exportId, 
                                processedCount.get()
                            );
                        } catch (Exception e) {
                            errorCount.incrementAndGet();
                            logger.error("Failed to send batch", e);
                            // Логируем для retry-механизма
                            progressService.logError(exportId, e);
                        }
                    });
            }
            
            progressService.completeExport(exportId, processedCount.get(), errorCount.get());
            logger.info("Export {} completed. Records: {}, Errors: {}", 
                exportId, processedCount.get(), errorCount.get());
                
        } catch (Exception e) {
            progressService.failExport(exportId, e);
            throw new DataExportException("Export failed", e);
        }
    }
    
    private void sendBatchWithRetry(List<Record> batch, String topic, int maxRetries) {
        for (int attempt = 1; attempt <= maxRetries; attempt++) {
            try {
                batch.stream()
                    .map(this::toEvent)
                    .forEach(event -> 
                        kafkaTemplate.send(topic, 
                            event.getId().toString(), 
                            event)
                    );
                return; // Успех
            } catch (Exception e) {
                if (attempt == maxRetries) {
                    throw new RuntimeException("Failed after " + maxRetries + " retries", e);
                }
                // Экспоненциальная задержка перед retry
                try {
                    Thread.sleep((long) Math.pow(2, attempt) * 100);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ie);
                }
            }
        }
    }
}

Дополнительные оптимизации

1. Индексы в БД

CREATE INDEX idx_large_table_id_status ON large_table(id, status);

2. Конфигурация Kafka

  • Увеличиваем batch.size и linger.ms для батчинга
  • Устанавливаем compression.type = snappy

3. Мониторинг

  • JMX метрики для Kafka
  • Логирование каждого батча
  • Алерты на ошибки отправки

Что выбрать

  • Малые объёмы (< 1млн): обычный JDBC
  • Средние (1-20млн): Stream API с батчингом
  • Большие (> 100млн): параллельная обработка + курсоры
  • Production: полная реализация с retry и мониторингом

Главное правило: читаем малыми порциями, отправляем без задержек, отслеживаем прогресс.

Какие решишь задачу при чтении 20 млн записей из базы данных и отправки в Message Broker | PrepBro