← Назад к вопросам
Как сделать блокирующий вызов в WebFlux приложении
1.7 Middle🔥 131 комментариев
#Основы Java
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как сделать блокирующий вызов в WebFlux приложении
WebFlux — это реактивный фреймворк Spring, основанный на неблокирующем I/O. Однако иногда нужно вызвать блокирующий код (например, синхронный API, БД без реактивного драйвера). Это сложный случай, требующий особого подхода.
1. Проблема: Blocking в Reactive контексте
import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;
@RestController
public class BlockingProblem {
@Autowired
private SyncService syncService; // Синхронный сервис
@GetMapping("/data")
public Mono<String> getData() {
// ❌ ОПАСНО: блокирующий вызов в реактивной цепи
String result = syncService.fetchData(); // БЛОКИРУЕТ reactor thread!
return Mono.just(result);
}
}
// Проблема:
// 1. Reactor thread был заблокирован
// 2. Другие запросы не могут обработаться
// 3. Приложение зависит (deadlock может быть)
2. Решение 1: Mono.fromCallable
Запустить блокирующий код в другом потоке:
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@RestController
public class BlockingWrapperController {
@Autowired
private SyncService syncService;
@GetMapping("/data")
public Mono<String> getData() {
// Правильно: обернуть блокирующий код
return Mono.fromCallable(() -> syncService.fetchData())
.subscribeOn(Schedulers.boundedElastic()); // В elasticated потоке
}
}
// Как работает:
// 1. Mono.fromCallable() оборачивает блокирующий вызов
// 2. subscribeOn(Schedulers.boundedElastic()) запускает в thread pool
// 3. Reactor потокне блокируется
// 4. Результат возвращается обратно
3. Решение 2: Mono.fromCallable с error handling
@GetMapping("/safe-data")
public Mono<String> getSafeData() {
return Mono.fromCallable(() -> syncService.fetchData())
.subscribeOn(Schedulers.boundedElastic())
.onErrorReturn("Default value") // Обработка ошибок
.timeout(Duration.ofSeconds(5)) // Таймаут
.doOnError(e -> logger.error("Error fetching data", e));
}
4. Решение 3: Flux.fromIterable для коллекций
Если нужно обработать результат как поток:
@GetMapping("/users")
public Flux<User> getUsers() {
// Вызываем синхронный метод, вот вернёт List<User>
return Flux.fromCallable(() -> userRepository.findAll())
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(Flux::fromIterable) // Раскрыть List как Flux
.map(this::enrichUser) // Трансформация
.doOnError(e -> logger.error("Error", e));
}
private User enrichUser(User user) {
// Дополнить информацию
return user;
}
5. Решение 4: Schedulers.boundedElastic vs других
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class SchedulerComparison {
// boundedElastic — для блокирующих операций (I/O, DB)
public void boundedElasticExample() {
Mono.fromCallable(() -> blockingCall())
.subscribeOn(Schedulers.boundedElastic()); // ✅ для Blocking операций
}
// parallel — для CPU-intensive операций
public void parallelExample() {
Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel())
.map(this::cpuIntensiveOperation)
.sequential(); // ✅ для CPU-heavy вычислений
}
// single — для синхронных операций в одном потоке
public void singleExample() {
Mono.just("value")
.subscribeOn(Schedulers.single()); // ✅ для последовательных операций
}
// immediate — без переключения потока (осторожно!)
public void immediateExample() {
Mono.just("value")
.subscribeOn(Schedulers.immediate()); // ❌ может заблокировать
}
}
6. Решение 5: Жадная инициализация thread pool
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@Configuration
public class ReactorConfig {
// Создать пользовательский thread pool для блокирующих операций
@Bean
public Scheduler blockingScheduler() {
return Schedulers.newBoundedElastic(
100, // maxThreads
Integer.MAX_VALUE, // maxQueuedTasks
"blocking-pool-" // threadNamePrefix
);
}
}
// Использование
@RestController
public class BlockingCallController {
@Autowired
private Scheduler blockingScheduler;
@Autowired
private SyncService syncService;
@GetMapping("/data")
public Mono<String> getData() {
return Mono.fromCallable(() -> syncService.fetchData())
.subscribeOn(blockingScheduler); // Использовать кастомный scheduler
}
}
7. Решение 6: Кэширование результатов блокирующих операций
import reactor.core.publisher.Mono;
import java.time.Duration;
@Service
public class CachedBlockingService {
@Autowired
private SyncService syncService;
private Mono<String> cachedData;
@PostConstruct
public void init() {
// Кэш результат блокирующего вызова
this.cachedData = Mono.fromCallable(() -> syncService.fetchData())
.subscribeOn(Schedulers.boundedElastic())
.cache() // Кэшировать результат
.onErrorReturn("Error loading data");
}
public Mono<String> getData() {
return cachedData; // Возвращать из кэша
}
}
8. Решение 7: CompletableFuture->Mono
Если есть асинхронный API, но не реактивный:
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
@RestController
public class AsyncToReactiveController {
@Autowired
private AsyncService asyncService; // Возвращает CompletableFuture
@GetMapping("/data")
public Mono<String> getData() {
// Конвертировать CompletableFuture в Mono
return Mono.fromFuture(() -> asyncService.fetchDataAsync());
}
}
9. Решение 8: Правильный способ для DB операций
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
// ✅ Реактивный репозиторий
public interface UserReactiveRepository extends R2dbcRepository<User, Long> {
Mono<User> findByEmail(String email);
Flux<User> findByAgeGreaterThan(int age);
}
@RestController
public class ReactiveDBController {
@Autowired
private UserReactiveRepository userRepository;
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
// Это уже реактивно, не нужно оборачивать!
return userRepository.findById(id);
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
}
// ❌ Если используешь старый JPA:
@Repository
public class OldJpaUserRepository {
@Autowired
private UserJpaRepository jpaRepository;
public Mono<User> findById(Long id) {
// Обернуть JPA операцию
return Mono.fromCallable(() -> jpaRepository.findById(id).orElse(null))
.subscribeOn(Schedulers.boundedElastic());
}
}
10. Практический пример: Вызов HTTP API
import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
@Service
public class ExternalApiService {
@Autowired
private RestTemplate restTemplate; // Синхронный клиент
// ❌ Неправильно: заблокирует reactor thread
public Mono<User> getUserWrong(String id) {
User user = restTemplate.getForObject("http://api.example.com/users/" + id, User.class);
return Mono.just(user);
}
// ✅ Правильно: использовать обёртку
public Mono<User> getUser(String id) {
return Mono.fromCallable(() ->
restTemplate.getForObject("http://api.example.com/users/" + id, User.class)
)
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(5))
.onErrorReturn(new User()); // Fallback
}
// ✅ Ещё лучше: использовать WebClient (реактивный)
public Mono<User> getUserBest(String id) {
return webClient
.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5));
}
}
11. Таблица сравнения подходов
┌─────────────────────────────────────────┬───────────┬──────────────┐
│ Подход │ Потоки │ Рекомендация │
├─────────────────────────────────────────┼───────────┼──────────────┤
│ Mono.fromCallable + boundedElastic │ Thread │ ✅ Standard │
│ Mono.fromCallable + customScheduler │ Thread │ ✅ Best │
│ Async API (CompletableFuture) │ N/A │ ✅ Good │
│ Реактивная БД (R2DBC) │ N/A │ ✅ Best │
│ Sync API без обёртки │ Reactor │ ❌ NEVER │
└─────────────────────────────────────────┴───────────┴──────────────┘
12. Diagnostic: как проверить блокировку
// Reactor предоставляет инструменты для диагностики
import reactor.core.scheduler.Schedulers;
@Configuration
public class ReactorDebugConfig {
@PostConstruct
public void enableDebugMode() {
// Включить дополнительные проверки
Hooks.onOperatorDebug(); // Stack trace в ошибках
}
}
// Или запустить с флагом
// java -Dreactor.netty.ioWorkerCount=1 ...
В итоге: Для блокирующего кода в WebFlux используй Mono.fromCallable с Schedulers.boundedElastic(). Это запустит код в отдельном потоке и не заблокирует reactor потоки. По возможности — мигрируй на реактивные библиотеки (WebClient вместо RestTemplate, R2DBC вместо JPA, и т.д.). Помни о таймаутах и обработке ошибок.