Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Ответ
Вопрос касается того, как использовать реактивное программирование (например, Project Reactor, RxJava) в приложениях на Netty без потери неблокирующего поведения. Netty — это асинхронный NIO фреймворк, и правильное использование реактивных стеков критично.
1. Основа: Netty + Project Reactor
Spring WebFlux использует именно эту комбинацию:
@RestController
public class ReactiveController {
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
// Это выполняется на event loop Netty, не блокирует!
return userService.findById(id)
.subscribeOn(Schedulers.boundedElastic()); // IO работа
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll()
.subscribeOn(Schedulers.boundedElastic());
}
}
2. Правильное использование Schedulers
Ключ к сохранению реактивности — правильный scheduler для каждого типа работы:
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
// Для IO операций (БД, HTTP запросы)
public Mono<User> findById(String id) {
return Mono.fromCallable(() -> userRepository.findById(id))
.subscribeOn(Schedulers.boundedElastic()) // IO scheduler
.doOnError(e -> log.error("Error", e));
}
// Для CPU-интенсивных операций
public Flux<User> processUsers(List<User> users) {
return Flux.fromIterable(users)
.publishOn(Schedulers.parallel()) // CPU scheduler
.map(this::complexCalculation);
}
// Для быстрых операций остаёмся в Netty event loop
public Mono<Boolean> isUserActive(User user) {
return Mono.fromCallable(() -> user.getStatus().equals("active"))
// Нет subscribeOn! Выполнится в event loop
.doOnNext(active -> log.info("User active: {}", active));
}
}
3. Обработка блокирующих операций
Если нужно вызвать блокирующий код, оборачивайте в Mono.fromCallable():
@Service
public class LegacyIntegration {
private final SomeBlockingService blockingService;
// НЕПРАВИЛЬНО — блокирует event loop!
public Mono<String> badApproach() {
String result = blockingService.doSomethingBlocking(); // БЛОКИРУЕТ!
return Mono.just(result);
}
// ПРАВИЛЬНО — делегирует в потоковый пул
public Mono<String> goodApproach() {
return Mono.fromCallable(() -> blockingService.doSomethingBlocking())
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(5))
.doOnError(e -> log.error("Timeout or error", e));
}
// Для очень дорогих операций
public Mono<String> expensiveOperation() {
return Mono.fromCallable(() -> blockingService.doExpensiveWork())
.subscribeOn(Schedulers.parallel()) // Если это CPU-bound
.timeout(Duration.ofSeconds(30));
}
}
4. Объединение нескольких асинхронных операций
@Service
public class CompositeService {
@Autowired
private UserRepository userRepository;
@Autowired
private OrderRepository orderRepository;
@Autowired
private NotificationService notificationService;
// Параллельно выполняем несколько операций
public Mono<UserProfile> getUserProfile(String userId) {
Mono<User> userMono = Mono.fromCallable(() ->
userRepository.findById(userId))
.subscribeOn(Schedulers.boundedElastic());
Mono<List<Order>> ordersMono = Mono.fromCallable(() ->
orderRepository.findByUserId(userId))
.subscribeOn(Schedulers.boundedElastic());
// Комбинируем с zip
return Mono.zip(userMono, ordersMono)
.map(tuple -> new UserProfile(
tuple.getT1(),
tuple.getT2()
));
}
// Или используйте flatMap для последовательных зависимостей
public Mono<Void> processUserOrder(String userId, String orderId) {
return Mono.fromCallable(() ->
userRepository.findById(userId))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(user ->
Mono.fromCallable(() ->
orderRepository.findById(orderId))
.subscribeOn(Schedulers.boundedElastic())
.map(order -> new UserOrder(user, order))
)
.flatMap(userOrder ->
notificationService.sendNotification(userOrder)
)
.then(); // Игнорируем результат
}
}
5. Обработка ошибок без потери реактивности
@RestController
public class ErrorHandlingController {
@Autowired
private UserService userService;
@GetMapping("/users/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.findById(id)
.map(ResponseEntity::ok)
.doOnError(UserNotFoundException.class, e ->
log.warn("User not found: {}", id))
.onErrorResume(UserNotFoundException.class, e ->
Mono.just(ResponseEntity.notFound().build()))
.onErrorResume(Exception.class, e -> {
log.error("Unexpected error", e);
return Mono.just(ResponseEntity.status(500).build());
});
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userService.findAll()
.doOnError(e -> log.error("Stream error", e))
.onErrorResume(e -> Flux.empty()) // Graceful fallback
.retry(2) // Повторить 2 раза при ошибке
.timeout(Duration.ofSeconds(30));
}
}
6. Избегайте обычных ошибок
@Service
public class ReactiveAntipatternsDemo {
// НЕПРАВИЛЬНО! Это блокирует event loop!
public Mono<String> blockingGet() {
return Mono.just(
userRepository.findById("123").block() // ❌ БЛОКИРУЕТ!
);
}
// ПРАВИЛЬНО
public Mono<String> nonBlockingGet() {
return Mono.fromCallable(() ->
userRepository.findById("123"))
.subscribeOn(Schedulers.boundedElastic());
}
// НЕПРАВИЛЬНО! Синхронизация в реактивном коде
public Mono<String> synchronizedWrong() {
return Mono.just(System.currentTimeMillis())
.doOnNext(time -> {
synchronized (this) { // ❌ Deadlock risk!
// Do something
}
});
}
// ПРАВИЛЬНО — используйте Schedulers для синхронизации
public Mono<String> synchronizedRight() {
return Mono.just(System.currentTimeMillis())
.publishOn(Schedulers.single()) // Сериализуем на одном потоке
.doOnNext(time -> {
// Теперь безопасно
});
}
}
7. Backpressure — критично для Netty
@RestController
public class BackpressureController {
@Autowired
private DataService dataService;
@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return dataService.generateLargeStream()
.buffer(100) // Буферизуем по 100 элементов
.flatMap(batch -> {
// Обрабатываем батчами
return Mono.just(batch);
})
.map(data -> ServerSentEvent.builder()
.data(data)
.build())
.doOnCancel(() -> log.info("Stream cancelled")
// Netty автоматически обрабатывает backpressure
}
}
8. Тестирование реактивного кода
@SpringBootTest
public class ReactiveServiceTest {
@Autowired
private UserService userService;
@Test
public void testReactiveFlow() {
// StepVerifier гарантирует, что всё реактивно
StepVerifier.create(
userService.findById("123")
)
.expectNextMatches(user -> user.getId().equals("123"))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
}
Итого: Как избежать потери реактивности
- Не блокируйте event loop — используйте
subscribeOn() - Выбирайте правильный scheduler —
boundedElastic()для IO - Оборачивайте блокирующий код в
Mono.fromCallable() - Используйте
flatMapдля зависимостей вместо вложенности - Обрабатывайте backpressure — Netty это делает автоматически
- Никогда не вызывайте
.block()в реактивном коде - Тестируйте с StepVerifier для гарантии реактивности
Нетти и реактивные стеки идеально работают вместе при правильном использовании schedulers.