← Назад к вопросам
Как решаешь проблему, если в запросе сотни миллионов записей
1.7 Middle🔥 141 комментариев
#Базы данных и SQL
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Обработка сотен миллионов записей
Это одна из самых частых проблем в production. Обработка такого объёма данных требует специального подхода. Расскажу о стратегиях, которые применял я.
1. Никогда не загружай всё в память
Первая ошибка:
// ❌ НЕПРАВИЛЬНО: загружает всё в памяти
@GetMapping("/export")
public List<UserDTO> exportAllUsers() {
return userRepository.findAll(); // 100M записей в памяти = OutOfMemory!
}
Правильно — streaming:
// ✅ ПРАВИЛЬНО: потоковая обработка
@GetMapping("/export")
public ResponseEntity<StreamingResponseBody> exportAllUsers() {
return ResponseEntity.ok()
.header("Content-Disposition", "attachment; filename=users.csv")
.body(outputStream -> {
try (CSVPrinter printer = new CSVPrinter(
new OutputStreamWriter(outputStream),
CSVFormat.DEFAULT)) {
// Обрабатываем батчами
int pageSize = 10000;
int pageNumber = 0;
boolean hasMore = true;
while (hasMore) {
Page<User> page = userRepository.findAll(
PageRequest.of(pageNumber, pageSize)
);
for (User user : page.getContent()) {
printer.printRecord(
user.getId(),
user.getEmail(),
user.getName()
);
}
hasMore = page.hasNext();
pageNumber++;
}
}
});
}
2. Pagination + Cursor-based навигация
Для API эндпоинтов:
// Offset-based (плохо для больших чисел)
// GET /users?page=10000&size=20 — ОЧЕНЬ МЕДЛЕННО!
// SELECT * FROM users OFFSET 200000 LIMIT 20;
// Cursor-based (хорошо)
@GetMapping("/users")
public ResponseEntity<Page<UserDTO>> listUsers(
@RequestParam(required = false) Long afterId,
@RequestParam(defaultValue = "100") int limit) {
Pageable pageable = PageRequest.of(0, limit + 1);
List<User> users;
if (afterId == null) {
users = userRepository.findFirstNOrderByIdAsc(limit + 1);
} else {
users = userRepository.findByIdGreaterThanOrderByIdAsc(afterId, limit + 1);
}
boolean hasMore = users.size() > limit;
users = users.subList(0, Math.min(users.size(), limit));
return ResponseEntity.ok(new PageDTO<>(
users.stream().map(this::toDTO).collect(Collectors.toList()),
hasMore,
users.isEmpty() ? null : users.get(users.size() - 1).getId()
));
}
public class PageDTO<T> {
private List<T> items;
private boolean hasMore;
private Long nextCursor;
// getters
}
3. Batch обработка с EntityManager
Для импорта/обработки:
@Service
public class BulkProcessingService {
@Autowired
private EntityManager entityManager;
@Autowired
private UserRepository userRepository;
@Transactional
public void importMillionsOfUsers(File csvFile) throws IOException {
int batchSize = 5000; // Оптимальный размер батча
int count = 0;
try (BufferedReader reader = new BufferedReader(new FileReader(csvFile))) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
User user = new User();
user.setEmail(parts[0]);
user.setName(parts[1]);
user.setPhone(parts[2]);
entityManager.persist(user);
count++;
// Flush каждые batchSize
if (count % batchSize == 0) {
entityManager.flush();
entityManager.clear(); // Очищаем First Level Cache
System.out.println("Processed: " + count);
}
}
// Final flush для остатка
entityManager.flush();
entityManager.clear();
}
}
}
4. JDBC Batch для максимальной производительности
Для импорта очень больших объёмов:
@Service
public class JdbcBulkInsertService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void importUsers(File csvFile) throws IOException {
int batchSize = 10000;
int count = 0;
List<Object[]> batch = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new FileReader(csvFile))) {
String line;
String sql = "INSERT INTO users (email, name, phone) VALUES (?, ?, ?)";
while ((line = reader.readLine()) != null) {
String[] parts = line.split(",");
batch.add(new Object[]{
parts[0], // email
parts[1], // name
parts[2] // phone
});
if (batch.size() >= batchSize) {
jdbcTemplate.batchUpdate(sql, batch);
count += batch.size();
batch.clear();
System.out.println("Processed: " + count);
}
}
// Final batch
if (!batch.isEmpty()) {
jdbcTemplate.batchUpdate(sql, batch);
count += batch.size();
}
}
}
}
5. Stream API для обработки
Для трансформации больших объёмов:
@Service
public class StreamProcessingService {
@Autowired
private UserRepository userRepository;
@Transactional(readOnly = true)
public void processLargeDataset() {
int pageSize = 1000;
int pageNumber = 0;
boolean hasMore = true;
while (hasMore) {
Page<User> page = userRepository.findAll(
PageRequest.of(pageNumber, pageSize)
);
// Stream обработка
page.getContent()
.parallelStream() // параллельная обработка
.map(this::transformUser)
.filter(this::isValid)
.forEach(this::saveResult);
hasMore = page.hasNext();
pageNumber++;
}
}
}
6. Database-side агрегация
Вместо загрузки данных в Java:
// ❌ НЕПРАВИЛЬНО: загружаем миллионы записей
public Long countActiveUsers() {
return userRepository.findAll().stream()
.filter(User::isActive)
.count();
}
// ✅ ПРАВИЛЬНО: считаем в базе
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
@Query("SELECT COUNT(u) FROM User u WHERE u.active = true")
Long countActiveUsers();
@Query("SELECT u.status, COUNT(u) FROM User u GROUP BY u.status")
List<Object[]> countByStatus();
}
7. Async обработка с Spring Task
Для долгих операций:
@Service
@EnableAsync
public class AsyncProcessingService {
@Autowired
private UserRepository userRepository;
@Async
public CompletableFuture<String> processLargeDatasetAsync(String fileName) {
try {
int processed = 0;
int pageSize = 5000;
int page = 0;
boolean hasMore = true;
while (hasMore) {
Page<User> users = userRepository.findAll(
PageRequest.of(page, pageSize)
);
// обработка
processed += users.getNumberOfElements();
hasMore = users.hasNext();
page++;
}
return CompletableFuture.completedFuture(
"Processed: " + processed
);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
}
// Использование:
@RestController
public class ImportController {
@Autowired
private AsyncProcessingService service;
@PostMapping("/import")
public ResponseEntity<String> startImport(@RequestParam String fileName) {
service.processLargeDatasetAsync(fileName);
return ResponseEntity.accepted()
.body("Processing started, check status later");
}
}
8. Индексы и Query оптимизация
// Создай индексы для часто используемых полей
@Entity
@Table(indexes = {
@Index(name = "idx_email", columnList = "email"),
@Index(name = "idx_status", columnList = "status"),
@Index(name = "idx_created_date", columnList = "created_date")
})
public class User {
// ...
}
// Используй SELECT только нужные колонки
@Query("SELECT new com.example.UserDTO(u.id, u.email) FROM User u")
List<UserDTO> findAllDTOs();
Чеклист для обработки миллионов записей
- Никогда не loadAll() — используй pagination
- Batch size 5000-10000 — оптимум для большинства case
- Clear First Level Cache — entityManager.clear()
- Streaming output — для экспорта
- Cursor-based pagination — для API
- JDBC batch — для максимальной скорости импорта
- Database aggregation — не в Java
- Async обработка — чтобы не блокировать
- Мониторь память — max heap, GC
- Тестируй с реальными объёмами — не с 1000 записей
Главное правило: потоковая обработка, а не загрузка всё в память.