← Назад к вопросам

Когда стоит использовать асинхронное API?

2.0 Middle🔥 181 комментариев
#Spring Boot и Spring Data#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Асинхронное API в Java — когда и зачем использовать

Асинхронное API — это подход, при котором операция запускается, и результат будет получен позже через callback, Future или другой механизм. Это критически важно для масштабируемых приложений.

Основные сценарии использования

1. Блокирующие I/O операции (Network, Database)

Синхронный подход — неэффективен:

// ❌ Синхронный код
for (int i = 0; i < 1000; i++) {
    // Каждый запрос занимает 100ms
    String result = makeHttpRequest("http://api.example.com/data");
    processResult(result);
}
// Итого: 1000 * 100ms = 100 секунд!
// Поток ЖДЁТ ответа от сервера — впустую расходует ресурсы

Проблема: один поток может обработать максимум одновременно 1 запрос. Нужно создавать сотни потоков → контекст-свичинг, утечка памяти, GC pauses.

Асинхронный подход:

import java.util.concurrent.CompletableFuture;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

// ✓ Асинхронный код
HttpClient client = HttpClient.newHttpClient();

List<CompletableFuture<String>> futures = new ArrayList<>();

for (int i = 0; i < 1000; i++) {
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create("http://api.example.com/data?id=" + i))
        .GET()
        .build();
    
    // Запускаем запрос асинхронно
    CompletableFuture<String> future = client.sendAsync(request, 
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::body)
        .thenApply(this::processResult);
    
    futures.add(future);
}

// Ждём все результаты
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .join();

// Итого: ~100ms (параллельно, не последовательно!)

Выигрыш: 1000 операций по 100ms каждая обрабатываются параллельно за ~100ms вместо 100 секунд.

2. Микросервисная архитектура

// ❌ Синхронный код (blocking calls)
@RestController
public class OrderController {
    @Autowired
    private RestTemplate restTemplate;  // Блокирующий клиент
    
    @GetMapping("/orders/{id}")
    public OrderDto getOrder(@PathVariable String id) {
        // Блокируем поток 1: ждём сервиса users
        UserDto user = restTemplate.getForObject(
            "http://users-service/users/" + id, UserDto.class);
        
        // Блокируем поток 2: ждём сервиса products
        List<ProductDto> products = restTemplate.getForObject(
            "http://products-service/products?order=" + id, List.class);
        
        // Блокируем поток 3: ждём сервиса payments
        PaymentDto payment = restTemplate.getForObject(
            "http://payments-service/payments/" + id, PaymentDto.class);
        
        return new OrderDto(user, products, payment);
    }
    // Поток заблокирован всё время!
    // Если каждый сервис отвечает 100ms, итого 300ms на один запрос
    // На 100 одновременных пользователей нужно 100 потоков
}

// ✓ Асинхронный код (non-blocking calls)
@RestController
public class OrderController {
    @Autowired
    private WebClient webClient;  // Асинхронный клиент
    
    @GetMapping("/orders/{id}")
    public CompletableFuture<OrderDto> getOrder(@PathVariable String id) {
        return webClient.get()
            .uri("http://users-service/users/" + id)
            .retrieve()
            .bodyToMono(UserDto.class)
            .zipWith(
                webClient.get()
                    .uri("http://products-service/products?order=" + id)
                    .retrieve()
                    .bodyToFlux(ProductDto.class)
                    .collectList()
            )
            .zipWith(
                webClient.get()
                    .uri("http://payments-service/payments/" + id)
                    .retrieve()
                    .bodyToMono(PaymentDto.class)
            )
            .map(tuple -> new OrderDto(
                tuple.getT1().getT1(),
                tuple.getT1().getT2(),
                tuple.getT2()
            ))
            .toFuture();
    }
    // Все 3 запроса идут ПАРАЛЛЕЛЬНО, не блокируя поток
    // На 100 одновременных пользователей нужно ~10 потоков
}

3. Реактивное программирование (Reactive Programming)

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// ✓ Реактивный подход с Project Reactor
public class ReactivePipeline {
    public Flux<OrderEvent> processOrders(Flux<Order> orders) {
        return orders
            // Валидируем асинхронно
            .filterWhen(this::validateOrderAsync)
            // Обогащаем асинхронно (вызываем микросервисы)
            .flatMap(this::enrichOrderAsync)
            // Сохраняем в БД асинхронно
            .flatMap(this::saveToDBAsync)
            // Публикуем события асинхронно
            .flatMap(this::publishEventAsync)
            .doOnError(error -> logger.error("Ошибка: " + error.getMessage()))
            .retry(3);  // Переповтор при ошибке
    }
    
    private Mono<Boolean> validateOrderAsync(Order order) {
        return Mono.fromFuture(
            CompletableFuture.supplyAsync(() -> {
                // CPU-bound валидация
                return order.isValid();
            })
        );
    }
    
    private Mono<Order> enrichOrderAsync(Order order) {
        return webClient.get()
            .uri("http://users-service/users/" + order.getUserId())
            .retrieve()
            .bodyToMono(UserDto.class)
            .map(user -> {
                order.setUser(user);
                return order;
            });
    }
}

4. Потоковая обработка данных

// ❌ Синхронный стриминг (slow)
public void processPriceUpdates() {
    for (PriceUpdate update : kafkaConsumer.poll(Duration.ofSeconds(1))) {
        // Синхронное сохранение → поток блокирован
        database.save(update);
        
        // Синхронный расчёт → задержка
        double newPrice = calculatePrice(update);
        
        // Синхронная публикация события
        eventBus.publish(new PriceChangeEvent(newPrice));
    }
}

// ✓ Асинхронный стриминг (fast)
public void processPriceUpdatesAsync() {
    kafkaStream
        .asFlux()
        .parallel(Runtime.getRuntime().availableProcessors())
        .runOn(Schedulers.parallel())
        .doOnNext(database::saveAsync)  // Асинхронное сохранение
        .flatMap(this::calculatePriceAsync)  // Асинхронный расчёт
        .flatMap(this::publishEventAsync)  // Асинхронная публикация
        .sequential()
        .subscribe();
}

5. Обработка больших объёмов данных

// ❌ Синхронный batch процесс (медленный)
public void processBatchSync() {
    List<User> users = database.getAllUsers();  // 1M пользователей
    
    for (User user : users) {
        String enrichedData = externalAPI.enrich(user);  // 100ms каждый
        database.update(user, enrichedData);
    }
    // Итого: 1M * 100ms = 100 000 секунд = 27+ часов!
}

// ✓ Асинхронный batch процесс (быстрый)
public void processBatchAsync() {
    database.getAllUsersAsync()
        .buffer(1000)  // Батчи по 1000
        .parallel()
        .runOn(Schedulers.parallel())
        .flatMap(batch -> 
            Flux.fromIterable(batch)
                .flatMap(user -> externalAPI.enrichAsync(user))
                .buffer(100)  // Батч save
                .flatMap(enrichedUsers -> database.updateAsync(enrichedUsers))
        )
        .sequential()
        .blockLast();  // Ждём завершения
    // Итого: ~100 секунд (параллельно по 10+ потокам)
}

6. Real-time обработка событий

// ✓ WebSocket + асинхронные подписки
@RestController
@RequestMapping("/api/stock-prices")
public class StockPriceController {
    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<StockPrice> streamStockPrices() {
        return stockService.getPriceUpdates()
            .filter(price -> price.getVolume() > 1000000)  // Фильтруем
            .map(this::enrichWithTechnicals)  // Трансформируем
            .debounce(Duration.ofMillis(100))  // Rate limit
            .timeout(Duration.ofSeconds(30));
    }
    
    // WebSocket endpoint
    @PostMapping("/subscribe")
    public Mono<Void> subscribe(@RequestBody StockFilter filter) {
        return stockService.watchPrices(filter)
            .doOnNext(wsTemplate::sendAsyncMessage)
            .then();
    }
}

Когда НЕ использовать асинхронное API?

// ❌ Избегай асинхронности для простых операций

// Неправильно — усложняем без пользы
public CompletableFuture<Integer> add(int a, int b) {
    return CompletableFuture.supplyAsync(() -> a + b);
}

// Правильно — просто вычисляем
public int add(int a, int b) {
    return a + b;
}

// Неправильно — nested callbacks (callback hell)
future1.thenApply(result1 ->
    future2.thenApply(result2 ->
        future3.thenApply(result3 ->
            // Спагетти код!
            process(result1, result2, result3)
        ).toCompletableFuture()
    ).toCompletableFuture()
).join();

// Правильно — используй flatMap (compose)
future1.flatMap(result1 ->
    future2.flatMap(result2 ->
        future3.map(result3 ->
            process(result1, result2, result3)
        )
    )
);

Чеклист для выбора асинхронного API

  • I/O bound операции? (Network, Database, File System) → Используй асинхронное API
  • Высокий объём одновременных запросов? (100+) → Асинхронное обязательно
  • Микросервисная архитектура? → WebClient/Reactor
  • Потоковая обработка? → Kafka Streams, Reactor
  • Real-time требования? → WebSocket + Reactive Streams
  • Simple в реализации? → CompletableFuture / Future
  • Сложная композиция? → Project Reactor или RxJava
  • CPU-bound операции? → Синхронное API, параллелизм через ForkJoinPool

Вывод

Асинхронное API критично для:

  • Приложений с высоким I/O (web сервисы)
  • Микросервисной архитектуры
  • Обработки потоков данных
  • Масштабирования для 100+ одновременных пользователей

Выбирай инструменты:

  • Простые случаи: CompletableFuture
  • Spring Boot: WebClient + @Async
  • Сложная логика: Project Reactor (Mono/Flux)
  • Легаси: RxJava

Асинхронность — это не опция, а необходимость в современной backend разработке.