← Назад к вопросам

Как сделать блокирующий вызов в WebFlux приложении

1.7 Middle🔥 131 комментариев
#Основы Java

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как сделать блокирующий вызов в WebFlux приложении

WebFlux — это реактивный фреймворк Spring, основанный на неблокирующем I/O. Однако иногда нужно вызвать блокирующий код (например, синхронный API, БД без реактивного драйвера). Это сложный случай, требующий особого подхода.

1. Проблема: Blocking в Reactive контексте

import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;

@RestController
public class BlockingProblem {
  @Autowired
  private SyncService syncService; // Синхронный сервис
  
  @GetMapping("/data")
  public Mono<String> getData() {
    // ❌ ОПАСНО: блокирующий вызов в реактивной цепи
    String result = syncService.fetchData(); // БЛОКИРУЕТ reactor thread!
    return Mono.just(result);
  }
}

// Проблема:
// 1. Reactor thread был заблокирован
// 2. Другие запросы не могут обработаться
// 3. Приложение зависит (deadlock может быть)

2. Решение 1: Mono.fromCallable

Запустить блокирующий код в другом потоке:

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController
public class BlockingWrapperController {
  @Autowired
  private SyncService syncService;
  
  @GetMapping("/data")
  public Mono<String> getData() {
    // Правильно: обернуть блокирующий код
    return Mono.fromCallable(() -> syncService.fetchData())
      .subscribeOn(Schedulers.boundedElastic()); // В elasticated потоке
  }
}

// Как работает:
// 1. Mono.fromCallable() оборачивает блокирующий вызов
// 2. subscribeOn(Schedulers.boundedElastic()) запускает в thread pool
// 3. Reactor потокне блокируется
// 4. Результат возвращается обратно

3. Решение 2: Mono.fromCallable с error handling

@GetMapping("/safe-data")
public Mono<String> getSafeData() {
  return Mono.fromCallable(() -> syncService.fetchData())
    .subscribeOn(Schedulers.boundedElastic())
    .onErrorReturn("Default value") // Обработка ошибок
    .timeout(Duration.ofSeconds(5)) // Таймаут
    .doOnError(e -> logger.error("Error fetching data", e));
}

4. Решение 3: Flux.fromIterable для коллекций

Если нужно обработать результат как поток:

@GetMapping("/users")
public Flux<User> getUsers() {
  // Вызываем синхронный метод, вот вернёт List<User>
  return Flux.fromCallable(() -> userRepository.findAll())
    .subscribeOn(Schedulers.boundedElastic())
    .flatMapMany(Flux::fromIterable) // Раскрыть List как Flux
    .map(this::enrichUser) // Трансформация
    .doOnError(e -> logger.error("Error", e));
}

private User enrichUser(User user) {
  // Дополнить информацию
  return user;
}

5. Решение 4: Schedulers.boundedElastic vs других

import reactor.core.scheduler.Schedulers;
import java.time.Duration;

public class SchedulerComparison {
  
  // boundedElastic — для блокирующих операций (I/O, DB)
  public void boundedElasticExample() {
    Mono.fromCallable(() -> blockingCall())
      .subscribeOn(Schedulers.boundedElastic()); // ✅ для Blocking операций
  }
  
  // parallel — для CPU-intensive операций
  public void parallelExample() {
    Flux.range(1, 1000)
      .parallel()
      .runOn(Schedulers.parallel())
      .map(this::cpuIntensiveOperation)
      .sequential(); // ✅ для CPU-heavy вычислений
  }
  
  // single — для синхронных операций в одном потоке
  public void singleExample() {
    Mono.just("value")
      .subscribeOn(Schedulers.single()); // ✅ для последовательных операций
  }
  
  // immediate — без переключения потока (осторожно!)
  public void immediateExample() {
    Mono.just("value")
      .subscribeOn(Schedulers.immediate()); // ❌ может заблокировать
  }
}

6. Решение 5: Жадная инициализация thread pool

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Configuration
public class ReactorConfig {
  
  // Создать пользовательский thread pool для блокирующих операций
  @Bean
  public Scheduler blockingScheduler() {
    return Schedulers.newBoundedElastic(
      100,              // maxThreads
      Integer.MAX_VALUE, // maxQueuedTasks
      "blocking-pool-" // threadNamePrefix
    );
  }
}

// Использование
@RestController
public class BlockingCallController {
  @Autowired
  private Scheduler blockingScheduler;
  
  @Autowired
  private SyncService syncService;
  
  @GetMapping("/data")
  public Mono<String> getData() {
    return Mono.fromCallable(() -> syncService.fetchData())
      .subscribeOn(blockingScheduler); // Использовать кастомный scheduler
  }
}

7. Решение 6: Кэширование результатов блокирующих операций

import reactor.core.publisher.Mono;
import java.time.Duration;

@Service
public class CachedBlockingService {
  @Autowired
  private SyncService syncService;
  
  private Mono<String> cachedData;
  
  @PostConstruct
  public void init() {
    // Кэш результат блокирующего вызова
    this.cachedData = Mono.fromCallable(() -> syncService.fetchData())
      .subscribeOn(Schedulers.boundedElastic())
      .cache() // Кэшировать результат
      .onErrorReturn("Error loading data");
  }
  
  public Mono<String> getData() {
    return cachedData; // Возвращать из кэша
  }
}

8. Решение 7: CompletableFuture->Mono

Если есть асинхронный API, но не реактивный:

import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;

@RestController
public class AsyncToReactiveController {
  @Autowired
  private AsyncService asyncService; // Возвращает CompletableFuture
  
  @GetMapping("/data")
  public Mono<String> getData() {
    // Конвертировать CompletableFuture в Mono
    return Mono.fromFuture(() -> asyncService.fetchDataAsync());
  }
}

9. Решение 8: Правильный способ для DB операций

import org.springframework.data.r2dbc.repository.R2dbcRepository;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

// ✅ Реактивный репозиторий
public interface UserReactiveRepository extends R2dbcRepository<User, Long> {
  Mono<User> findByEmail(String email);
  Flux<User> findByAgeGreaterThan(int age);
}

@RestController
public class ReactiveDBController {
  @Autowired
  private UserReactiveRepository userRepository;
  
  @GetMapping("/users/{id}")
  public Mono<User> getUser(@PathVariable Long id) {
    // Это уже реактивно, не нужно оборачивать!
    return userRepository.findById(id);
  }
  
  @GetMapping("/users")
  public Flux<User> getAllUsers() {
    return userRepository.findAll();
  }
}

// ❌ Если используешь старый JPA:
@Repository
public class OldJpaUserRepository {
  @Autowired
  private UserJpaRepository jpaRepository;
  
  public Mono<User> findById(Long id) {
    // Обернуть JPA операцию
    return Mono.fromCallable(() -> jpaRepository.findById(id).orElse(null))
      .subscribeOn(Schedulers.boundedElastic());
  }
}

10. Практический пример: Вызов HTTP API

import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;

@Service
public class ExternalApiService {
  @Autowired
  private RestTemplate restTemplate; // Синхронный клиент
  
  // ❌ Неправильно: заблокирует reactor thread
  public Mono<User> getUserWrong(String id) {
    User user = restTemplate.getForObject("http://api.example.com/users/" + id, User.class);
    return Mono.just(user);
  }
  
  // ✅ Правильно: использовать обёртку
  public Mono<User> getUser(String id) {
    return Mono.fromCallable(() -> 
      restTemplate.getForObject("http://api.example.com/users/" + id, User.class)
    )
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(Duration.ofSeconds(5))
    .onErrorReturn(new User()); // Fallback
  }
  
  // ✅ Ещё лучше: использовать WebClient (реактивный)
  public Mono<User> getUserBest(String id) {
    return webClient
      .get()
      .uri("/users/{id}", id)
      .retrieve()
      .bodyToMono(User.class)
      .timeout(Duration.ofSeconds(5));
  }
}

11. Таблица сравнения подходов

┌─────────────────────────────────────────┬───────────┬──────────────┐
│ Подход                                  │ Потоки    │ Рекомендация │
├─────────────────────────────────────────┼───────────┼──────────────┤
│ Mono.fromCallable + boundedElastic      │ Thread    │ ✅ Standard  │
│ Mono.fromCallable + customScheduler     │ Thread    │ ✅ Best      │
│ Async API (CompletableFuture)           │ N/A       │ ✅ Good      │
│ Реактивная БД (R2DBC)                   │ N/A       │ ✅ Best      │
│ Sync API без обёртки                    │ Reactor   │ ❌ NEVER     │
└─────────────────────────────────────────┴───────────┴──────────────┘

12. Diagnostic: как проверить блокировку

// Reactor предоставляет инструменты для диагностики
import reactor.core.scheduler.Schedulers;

@Configuration
public class ReactorDebugConfig {
  @PostConstruct
  public void enableDebugMode() {
    // Включить дополнительные проверки
    Hooks.onOperatorDebug(); // Stack trace в ошибках
  }
}

// Или запустить с флагом
// java -Dreactor.netty.ioWorkerCount=1 ...

В итоге: Для блокирующего кода в WebFlux используй Mono.fromCallable с Schedulers.boundedElastic(). Это запустит код в отдельном потоке и не заблокирует reactor потоки. По возможности — мигрируй на реактивные библиотеки (WebClient вместо RestTemplate, R2DBC вместо JPA, и т.д.). Помни о таймаутах и обработке ошибок.

Как сделать блокирующий вызов в WebFlux приложении | PrepBro