← Назад к вопросам

Как решаешь проблему, если в запросе сотни миллионов записей

1.7 Middle🔥 141 комментариев
#Базы данных и SQL

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Обработка сотен миллионов записей

Это одна из самых частых проблем в production. Обработка такого объёма данных требует специального подхода. Расскажу о стратегиях, которые применял я.

1. Никогда не загружай всё в память

Первая ошибка:

// ❌ НЕПРАВИЛЬНО: загружает всё в памяти
@GetMapping("/export")
public List<UserDTO> exportAllUsers() {
    return userRepository.findAll(); // 100M записей в памяти = OutOfMemory!
}

Правильно — streaming:

// ✅ ПРАВИЛЬНО: потоковая обработка
@GetMapping("/export")
public ResponseEntity<StreamingResponseBody> exportAllUsers() {
    return ResponseEntity.ok()
        .header("Content-Disposition", "attachment; filename=users.csv")
        .body(outputStream -> {
            try (CSVPrinter printer = new CSVPrinter(
                    new OutputStreamWriter(outputStream), 
                    CSVFormat.DEFAULT)) {
                
                // Обрабатываем батчами
                int pageSize = 10000;
                int pageNumber = 0;
                boolean hasMore = true;
                
                while (hasMore) {
                    Page<User> page = userRepository.findAll(
                        PageRequest.of(pageNumber, pageSize)
                    );
                    
                    for (User user : page.getContent()) {
                        printer.printRecord(
                            user.getId(),
                            user.getEmail(),
                            user.getName()
                        );
                    }
                    
                    hasMore = page.hasNext();
                    pageNumber++;
                }
            }
        });
}

2. Pagination + Cursor-based навигация

Для API эндпоинтов:

// Offset-based (плохо для больших чисел)
// GET /users?page=10000&size=20 — ОЧЕНЬ МЕДЛЕННО!
// SELECT * FROM users OFFSET 200000 LIMIT 20;

// Cursor-based (хорошо)
@GetMapping("/users")
public ResponseEntity<Page<UserDTO>> listUsers(
        @RequestParam(required = false) Long afterId,
        @RequestParam(defaultValue = "100") int limit) {
    
    Pageable pageable = PageRequest.of(0, limit + 1);
    List<User> users;
    
    if (afterId == null) {
        users = userRepository.findFirstNOrderByIdAsc(limit + 1);
    } else {
        users = userRepository.findByIdGreaterThanOrderByIdAsc(afterId, limit + 1);
    }
    
    boolean hasMore = users.size() > limit;
    users = users.subList(0, Math.min(users.size(), limit));
    
    return ResponseEntity.ok(new PageDTO<>(
        users.stream().map(this::toDTO).collect(Collectors.toList()),
        hasMore,
        users.isEmpty() ? null : users.get(users.size() - 1).getId()
    ));
}

public class PageDTO<T> {
    private List<T> items;
    private boolean hasMore;
    private Long nextCursor;
    
    // getters
}

3. Batch обработка с EntityManager

Для импорта/обработки:

@Service
public class BulkProcessingService {
    @Autowired
    private EntityManager entityManager;
    
    @Autowired
    private UserRepository userRepository;
    
    @Transactional
    public void importMillionsOfUsers(File csvFile) throws IOException {
        int batchSize = 5000; // Оптимальный размер батча
        int count = 0;
        
        try (BufferedReader reader = new BufferedReader(new FileReader(csvFile))) {
            String line;
            
            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",");
                
                User user = new User();
                user.setEmail(parts[0]);
                user.setName(parts[1]);
                user.setPhone(parts[2]);
                
                entityManager.persist(user);
                count++;
                
                // Flush каждые batchSize
                if (count % batchSize == 0) {
                    entityManager.flush();
                    entityManager.clear(); // Очищаем First Level Cache
                    System.out.println("Processed: " + count);
                }
            }
            
            // Final flush для остатка
            entityManager.flush();
            entityManager.clear();
        }
    }
}

4. JDBC Batch для максимальной производительности

Для импорта очень больших объёмов:

@Service
public class JdbcBulkInsertService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public void importUsers(File csvFile) throws IOException {
        int batchSize = 10000;
        int count = 0;
        List<Object[]> batch = new ArrayList<>();
        
        try (BufferedReader reader = new BufferedReader(new FileReader(csvFile))) {
            String line;
            String sql = "INSERT INTO users (email, name, phone) VALUES (?, ?, ?)";
            
            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",");
                batch.add(new Object[]{
                    parts[0], // email
                    parts[1], // name
                    parts[2]  // phone
                });
                
                if (batch.size() >= batchSize) {
                    jdbcTemplate.batchUpdate(sql, batch);
                    count += batch.size();
                    batch.clear();
                    System.out.println("Processed: " + count);
                }
            }
            
            // Final batch
            if (!batch.isEmpty()) {
                jdbcTemplate.batchUpdate(sql, batch);
                count += batch.size();
            }
        }
    }
}

5. Stream API для обработки

Для трансформации больших объёмов:

@Service
public class StreamProcessingService {
    @Autowired
    private UserRepository userRepository;
    
    @Transactional(readOnly = true)
    public void processLargeDataset() {
        int pageSize = 1000;
        int pageNumber = 0;
        
        boolean hasMore = true;
        while (hasMore) {
            Page<User> page = userRepository.findAll(
                PageRequest.of(pageNumber, pageSize)
            );
            
            // Stream обработка
            page.getContent()
                .parallelStream() // параллельная обработка
                .map(this::transformUser)
                .filter(this::isValid)
                .forEach(this::saveResult);
            
            hasMore = page.hasNext();
            pageNumber++;
        }
    }
}

6. Database-side агрегация

Вместо загрузки данных в Java:

// ❌ НЕПРАВИЛЬНО: загружаем миллионы записей
public Long countActiveUsers() {
    return userRepository.findAll().stream()
        .filter(User::isActive)
        .count();
}

// ✅ ПРАВИЛЬНО: считаем в базе
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
    @Query("SELECT COUNT(u) FROM User u WHERE u.active = true")
    Long countActiveUsers();
    
    @Query("SELECT u.status, COUNT(u) FROM User u GROUP BY u.status")
    List<Object[]> countByStatus();
}

7. Async обработка с Spring Task

Для долгих операций:

@Service
@EnableAsync
public class AsyncProcessingService {
    @Autowired
    private UserRepository userRepository;
    
    @Async
    public CompletableFuture<String> processLargeDatasetAsync(String fileName) {
        try {
            int processed = 0;
            int pageSize = 5000;
            int page = 0;
            
            boolean hasMore = true;
            while (hasMore) {
                Page<User> users = userRepository.findAll(
                    PageRequest.of(page, pageSize)
                );
                
                // обработка
                processed += users.getNumberOfElements();
                
                hasMore = users.hasNext();
                page++;
            }
            
            return CompletableFuture.completedFuture(
                "Processed: " + processed
            );
        } catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }
}

// Использование:
@RestController
public class ImportController {
    @Autowired
    private AsyncProcessingService service;
    
    @PostMapping("/import")
    public ResponseEntity<String> startImport(@RequestParam String fileName) {
        service.processLargeDatasetAsync(fileName);
        return ResponseEntity.accepted()
            .body("Processing started, check status later");
    }
}

8. Индексы и Query оптимизация

// Создай индексы для часто используемых полей
@Entity
@Table(indexes = {
    @Index(name = "idx_email", columnList = "email"),
    @Index(name = "idx_status", columnList = "status"),
    @Index(name = "idx_created_date", columnList = "created_date")
})
public class User {
    // ...
}

// Используй SELECT только нужные колонки
@Query("SELECT new com.example.UserDTO(u.id, u.email) FROM User u")
List<UserDTO> findAllDTOs();

Чеклист для обработки миллионов записей

  1. Никогда не loadAll() — используй pagination
  2. Batch size 5000-10000 — оптимум для большинства case
  3. Clear First Level Cache — entityManager.clear()
  4. Streaming output — для экспорта
  5. Cursor-based pagination — для API
  6. JDBC batch — для максимальной скорости импорта
  7. Database aggregation — не в Java
  8. Async обработка — чтобы не блокировать
  9. Мониторь память — max heap, GC
  10. Тестируй с реальными объёмами — не с 1000 записей

Главное правило: потоковая обработка, а не загрузка всё в память.

Как решаешь проблему, если в запросе сотни миллионов записей | PrepBro