← Назад к вопросам
Какие решишь задачу при чтении 20 млн записей из базы данных и отправки в Message Broker
3.0 Senior🔥 151 комментариев
#Базы данных и SQL#Брокеры сообщений
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация чтения 20млн записей из БД и отправки в Message Broker
Это классическая задача на масштабируемость. 20 млн записей требует специального подхода, иначе приложение упадёт из-за нехватки памяти или timeout-ов. Рассмотрю комплексное решение.
Главные проблемы
- Переполнение памяти — загрузить 20млн объектов в RAM невозможно
- Timeout от БД — долгое чтение может закрыть соединение
- Переполнение Message Broker — нельзя отправить всё сразу
- Отсутствие отказоустойчивости — если сбой в середине, всё теряется
Решение 1: Потоковое чтение с батчингом (Stream Processing)
public class DatabaseStreamProcessor {
private static final int BATCH_SIZE = 1000; // читаем порциями
private static final int QUEUE_SIZE = 5000;
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
public void processLargeDataset(String topic) {
// Используем SQL с LIMIT и OFFSET для потокового чтения
long totalRecords = getTotalRecordCount();
for (long offset = 0; offset < totalRecords; offset += BATCH_SIZE) {
List<Record> batch = readBatch(offset, BATCH_SIZE);
// Отправляем батч без загрузки всего в памяти
sendToMessageBroker(batch, topic);
// Логируем прогресс
long progress = Math.min(offset + BATCH_SIZE, totalRecords);
System.out.printf("Processed: %d / %d (%.2f%%)\n",
progress, totalRecords, (100.0 * progress / totalRecords));
}
}
private List<Record> readBatch(long offset, int limit) {
String sql = "SELECT * FROM large_table ORDER BY id LIMIT ? OFFSET ?";
return jdbcTemplate.query(sql,
new Object[]{limit, offset},
new RecordRowMapper());
}
private void sendToMessageBroker(List<Record> batch, String topic) {
for (Record record : batch) {
String json = convertToJson(record);
kafkaTemplate.send(topic, record.getId().toString(), json);
}
}
}
Решение 2: Курсоры БД (для PostgreSQL/MySQL)
Курсоры позволяют итерировать по данным без загрузки всего в памяти:
public class CursorBasedProcessor {
public void processWithCursor(DataSource dataSource, String topic)
throws SQLException {
try (Connection conn = dataSource.getConnection()) {
// Отключаем автокоммит для работы с курсором
conn.setAutoCommit(false);
// Создаём курсор
try (Statement stmt = conn.createStatement(
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
// Указываем размер батча для драйвера
stmt.setFetchSize(Integer.MIN_VALUE); // потоковое чтение
ResultSet rs = stmt.executeQuery(
"SELECT id, name, data FROM large_table");
List<Record> batch = new ArrayList<>();
int batchCount = 0;
while (rs.next()) {
Record record = mapRecord(rs);
batch.add(record);
if (batch.size() >= 1000) {
sendBatch(batch, topic);
batch.clear();
batchCount++;
System.out.println("Sent batch: " + batchCount);
}
}
// Последний неполный батч
if (!batch.isEmpty()) {
sendBatch(batch, topic);
}
}
}
}
}
Решение 3: Spring Data Stream (лучше всего)
@Repository
public interface RecordRepository extends JpaRepository<Record, Long> {
Stream<Record> findAllByStatusOrderById(String status);
}
@Service
public class LargeDatasetService {
private final RecordRepository repository;
private final KafkaTemplate<String, RecordEvent> kafkaTemplate;
@Transactional(readOnly = true)
public void streamAllRecords(String topic) {
try (Stream<Record> stream = repository.findAllByStatusOrderById("ACTIVE")) {
stream
.peek(r -> System.out.println("Processing: " + r.getId()))
.map(this::toEvent)
.collect(Collectors.groupingBy(
e -> e.getId() % 10, // партиционируем для параллелизма
Collectors.toList()
))
.values()
.forEach(batch -> sendBatch(batch, topic));
}
}
private void sendBatch(List<RecordEvent> batch, String topic) {
batch.forEach(event ->
kafkaTemplate.send(topic, event.getId().toString(), event)
);
}
}
Решение 4: Параллельная обработка с ExecutorService
Для ещё большей скорости можно распараллелить чтение:
public class ParallelBatchProcessor {
private static final int NUM_THREADS = 4;
private static final int BATCH_SIZE = 1000;
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ExecutorService executor;
public ParallelBatchProcessor() {
this.executor = Executors.newFixedThreadPool(NUM_THREADS);
}
public void processInParallel(String topic) throws InterruptedException {
long totalRecords = getTotalRecordCount();
List<Future<?>> futures = new ArrayList<>();
for (long offset = 0; offset < totalRecords; offset += BATCH_SIZE) {
final long currentOffset = offset;
// Отправляем задачу в пул потоков
Future<?> future = executor.submit(() -> {
List<Record> batch = readBatch(currentOffset, BATCH_SIZE);
sendToMessageBroker(batch, topic);
System.out.println("Batch at offset " + currentOffset + " processed");
});
futures.add(future);
}
// Ждём завершения всех задач
for (Future<?> future : futures) {
future.get(); // блокируем до завершения
}
executor.shutdown();
}
}
Решение 5: Полная production-ready реализация
@Service
public class MassiveDataExportService {
private static final int BATCH_SIZE = 5000;
private static final int QUEUE_SIZE = 10000;
private static final int MAX_RETRIES = 3;
private final RecordRepository repository;
private final KafkaTemplate<String, RecordEvent> kafkaTemplate;
private final ExportProgressService progressService;
private final Logger logger = LoggerFactory.getLogger(this.class);
@Transactional(readOnly = true)
public void exportLargeDataset(String exportId, String topic) {
try {
progressService.startExport(exportId);
AtomicLong processedCount = new AtomicLong(0);
AtomicLong errorCount = new AtomicLong(0);
try (Stream<Record> stream = repository.findAllOrderById()) {
stream
.collect(Collectors.groupingByConcurrent(
r -> r.getId() % BATCH_SIZE,
Collectors.toList()
))
.values()
.forEach(batch -> {
try {
sendBatchWithRetry(batch, topic, MAX_RETRIES);
processedCount.addAndGet(batch.size());
// Обновляем прогресс
progressService.updateProgress(
exportId,
processedCount.get()
);
} catch (Exception e) {
errorCount.incrementAndGet();
logger.error("Failed to send batch", e);
// Логируем для retry-механизма
progressService.logError(exportId, e);
}
});
}
progressService.completeExport(exportId, processedCount.get(), errorCount.get());
logger.info("Export {} completed. Records: {}, Errors: {}",
exportId, processedCount.get(), errorCount.get());
} catch (Exception e) {
progressService.failExport(exportId, e);
throw new DataExportException("Export failed", e);
}
}
private void sendBatchWithRetry(List<Record> batch, String topic, int maxRetries) {
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
batch.stream()
.map(this::toEvent)
.forEach(event ->
kafkaTemplate.send(topic,
event.getId().toString(),
event)
);
return; // Успех
} catch (Exception e) {
if (attempt == maxRetries) {
throw new RuntimeException("Failed after " + maxRetries + " retries", e);
}
// Экспоненциальная задержка перед retry
try {
Thread.sleep((long) Math.pow(2, attempt) * 100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
}
}
Дополнительные оптимизации
1. Индексы в БД
CREATE INDEX idx_large_table_id_status ON large_table(id, status);
2. Конфигурация Kafka
- Увеличиваем
batch.sizeиlinger.msдля батчинга - Устанавливаем
compression.type = snappy
3. Мониторинг
- JMX метрики для Kafka
- Логирование каждого батча
- Алерты на ошибки отправки
Что выбрать
- Малые объёмы (< 1млн): обычный JDBC
- Средние (1-20млн): Stream API с батчингом
- Большие (> 100млн): параллельная обработка + курсоры
- Production: полная реализация с retry и мониторингом
Главное правило: читаем малыми порциями, отправляем без задержек, отслеживаем прогресс.