← Назад к вопросам

Что часто используешь в совокупности с агрегационными запросами

1.8 Middle🔥 131 комментариев
#Базы данных и SQL

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Инструменты и техники, используемые с агрегационными запросами

Агрегационные запросы — это мощный инструмент для обработки больших объемов данных прямо в базе данных. В Java приложениях они часто используются в комбинации с другими инструментами и подходами для максимальной эффективности.

1. Pagination и Cursor-based Navigation

При работе с большими результатами агрегации используется пагинация для уменьшения объема передаваемых данных:

public class AggregationWithPagination {
    public List<SalesReport> getAggregatedSalesWithPagination(
            int pageNumber, int pageSize) {
        
        return mongoTemplate.aggregate(
            newAggregation(
                match(Criteria.where("status").is("completed")),
                group("category").sum("amount").as("totalSales"),
                sort(Direction.DESC, "totalSales"),
                skip((long) pageNumber * pageSize),
                limit(pageSize)
            ),
            "orders",
            SalesReport.class
        ).getMappedResults();
    }
}

2. Caching (Кеширование)

Результаты дорогих агрегаций часто кешируются для повышения производительности:

public class CachedAggregationService {
    @Cacheable(value = "salesByCategory", key = "#month")
    public List<CategorySales> getSalesByCategoryMonthly(String month) {
        return mongoTemplate.aggregate(
            newAggregation(
                match(Criteria.where("date").regex("^" + month)),
                group("category").sum("amount").as("total"),
                sort(Direction.DESC, "total")
            ),
            "sales",
            CategorySales.class
        ).getMappedResults();
    }
    
    @CacheEvict(value = "salesByCategory", allEntries = true)
    public void clearSalesCache() {
        // Вызывается при изменении данных
    }
}

3. Async Processing и CompletableFuture

Для неблокирующей обработки агрегаций используется асинхронная обработка:

public class AsyncAggregationService {
    @Async
    public CompletableFuture<List<MonthlyMetrics>> 
            aggregateMonthlyMetricsAsync(String year) {
        
        List<MonthlyMetrics> results = mongoTemplate.aggregate(
            newAggregation(
                match(Criteria.where("year").is(year)),
                group("month").sum("users").as("activeUsers")
                            .sum("transactions").as("totalTransactions"),
                sort(Direction.ASC, "_id")
            ),
            "activities",
            MonthlyMetrics.class
        ).getMappedResults();
        
        return CompletableFuture.completedFuture(results);
    }
    
    public void processMetricsAsync() {
        CompletableFuture.supplyAsync(() -> 
                aggregateMonthlyMetricsAsync("2024"))
            .thenApply(CompletableFuture::join)
            .thenAccept(metrics -> saveToDataWarehouse(metrics))
            .exceptionally(ex -> {
                logger.error("Aggregation failed", ex);
                return null;
            });
    }
}

4. Stream API для обработки результатов

Полученные результаты агрегации обрабатываются с помощью Stream API:

public class AggregationStreamProcessing {
    public Map<String, Double> processAggregationResults() {
        List<SalesData> aggregated = fetchAggregatedSales();
        
        return aggregated.stream()
            .filter(sale -> sale.getAmount() > 100)
            .collect(Collectors.groupingBy(
                SalesData::getCategory,
                Collectors.summingDouble(SalesData::getAmount)
            ));
    }
    
    public List<TopProduct> getTopNProducts(int n) {
        return fetchProductAggregation()
            .stream()
            .sorted(Comparator.comparingDouble(
                ProductSales::getRevenue).reversed())
            .limit(n)
            .collect(Collectors.toList());
    }
}

5. Параллельная обработка (Parallel Streams)

Для больших объемов данных используется параллельная обработка:

public class ParallelAggregationProcessing {
    public long calculateTotalRevenueParallel() {
        return mongoTemplate.aggregate(
            newAggregation(
                match(Criteria.where("status").is("completed")),
                group("productId").sum("price").as("revenue")
            ),
            "orders",
            ProductRevenue.class
        ).getMappedResults()
            .parallelStream()
            .mapToLong(ProductRevenue::getRevenue)
            .sum();
    }
}

6. Message Queues (Kafka, RabbitMQ)

Результаты агрегаций отправляются в message queues для дальнейшей обработки:

public class AggregationToMessageQueue {
    @Autowired
    private KafkaTemplate<String, AggregationResult> kafkaTemplate;
    
    public void publishAggregationResults() {
        List<AggregationResult> results = 
            performComplexAggregation();
        
        results.forEach(result -> 
            kafkaTemplate.send("aggregation-results", result)
        );
    }
}

7. Data Warehouse и ETL

Результаты агрегаций часто загружаются в data warehouse для дальнейшей аналитики:

public class AggregationToDataWarehouse {
    public void syncAggregatedDataToWarehouse() {
        List<AnalyticsData> aggregated = 
            getAggregatedMetrics();
        
        // Batch insert в data warehouse
        batchInsertToWarehouse(aggregated, 1000);
    }
    
    private void batchInsertToWarehouse(
            List<AnalyticsData> data, int batchSize) {
        for (int i = 0; i < data.size(); i += batchSize) {
            int end = Math.min(i + batchSize, data.size());
            dataWarehouseRepository.saveAll(
                data.subList(i, end)
            );
        }
    }
}

8. Monitoring и Metrics

Производительность агрегаций мониторится и отслеживается:

public class MonitoredAggregation {
    @Timed(value = "aggregation.duration", description = 
            "Time taken to perform aggregation")
    public List<Result> getMonitoredAggregation() {
        long startTime = System.currentTimeMillis();
        
        List<Result> results = mongoTemplate.aggregate(
            newAggregation(/*...*/),
            "collection",
            Result.class
        ).getMappedResults();
        
        long duration = System.currentTimeMillis() - startTime;
        metricsRegistry.timer("aggregation.time")
            .record(duration, TimeUnit.MILLISECONDS);
        
        return results;
    }
}

9. Circuit Breaker паттерн

При работе с тяжелыми агрегациями используется Circuit Breaker для предотвращения перегрузок:

public class ResilientAggregation {
    @CircuitBreaker(name = "aggregationService",
                    fallbackMethod = "fallbackAggregation")
    public List<Result> heavyAggregation() {
        return mongoTemplate.aggregate(/*...*/).getMappedResults();
    }
    
    public List<Result> fallbackAggregation(
            IOException ex) {
        logger.warn("Aggregation circuit breaker triggered");
        return getCachedResults();
    }
}

10. Query Optimization и Explain Plans

Анализ и оптимизация агрегационных запросов:

public class OptimizedAggregation {
    public void analyzeAggregationPerformance() {
        // В MongoDB можно использовать explain()
        AggregationResults<Document> results = 
            mongoTemplate.aggregate(
                newAggregation(
                    match(Criteria.where("status").is("active")),
                    group("userId").sum("amount").as("total"),
                    sort(Direction.DESC, "total")
                ),
                "transactions",
                Document.class
            );
        
        // Проверить execution stats
        results.getMappedResults();
    }
}

Лучшие практики

  • Используй projection на ранних этапах агрегации для уменьшения объема данных
  • Индексируй поля, по которым фильтруешь и группируешь
  • Кеши дорогие агрегации, особенно если они вычисляются часто
  • Мониторь производительность и анализируй execution plans
  • Используй batch processing для больших объемов результатов
  • Асинхронно обрабатывай долгие агрегации, чтобы не блокировать основной thread

Комбинация всех этих инструментов позволяет эффективно работать с большими данными и обеспечивать высокую производительность приложения.

Что часто используешь в совокупности с агрегационными запросами | PrepBro