← Назад к вопросам

Как обойти Netty без потери реактивности

3.0 Senior🔥 101 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Ответ

Вопрос касается того, как использовать реактивное программирование (например, Project Reactor, RxJava) в приложениях на Netty без потери неблокирующего поведения. Netty — это асинхронный NIO фреймворк, и правильное использование реактивных стеков критично.

1. Основа: Netty + Project Reactor

Spring WebFlux использует именно эту комбинацию:

@RestController
public class ReactiveController {
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        // Это выполняется на event loop Netty, не блокирует!
        return userService.findById(id)
            .subscribeOn(Schedulers.boundedElastic());  // IO работа
    }
    
    @GetMapping("/users")
    public Flux<User> getAllUsers() {
        return userService.findAll()
            .subscribeOn(Schedulers.boundedElastic());
    }
}

2. Правильное использование Schedulers

Ключ к сохранению реактивности — правильный scheduler для каждого типа работы:

@Service
public class UserService {
    @Autowired
    private UserRepository userRepository;
    
    // Для IO операций (БД, HTTP запросы)
    public Mono<User> findById(String id) {
        return Mono.fromCallable(() -> userRepository.findById(id))
            .subscribeOn(Schedulers.boundedElastic())  // IO scheduler
            .doOnError(e -> log.error("Error", e));
    }
    
    // Для CPU-интенсивных операций
    public Flux<User> processUsers(List<User> users) {
        return Flux.fromIterable(users)
            .publishOn(Schedulers.parallel())  // CPU scheduler
            .map(this::complexCalculation);
    }
    
    // Для быстрых операций остаёмся в Netty event loop
    public Mono<Boolean> isUserActive(User user) {
        return Mono.fromCallable(() -> user.getStatus().equals("active"))
            // Нет subscribeOn! Выполнится в event loop
            .doOnNext(active -> log.info("User active: {}", active));
    }
}

3. Обработка блокирующих операций

Если нужно вызвать блокирующий код, оборачивайте в Mono.fromCallable():

@Service
public class LegacyIntegration {
    private final SomeBlockingService blockingService;
    
    // НЕПРАВИЛЬНО — блокирует event loop!
    public Mono<String> badApproach() {
        String result = blockingService.doSomethingBlocking();  // БЛОКИРУЕТ!
        return Mono.just(result);
    }
    
    // ПРАВИЛЬНО — делегирует в потоковый пул
    public Mono<String> goodApproach() {
        return Mono.fromCallable(() -> blockingService.doSomethingBlocking())
            .subscribeOn(Schedulers.boundedElastic())
            .timeout(Duration.ofSeconds(5))
            .doOnError(e -> log.error("Timeout or error", e));
    }
    
    // Для очень дорогих операций
    public Mono<String> expensiveOperation() {
        return Mono.fromCallable(() -> blockingService.doExpensiveWork())
            .subscribeOn(Schedulers.parallel())  // Если это CPU-bound
            .timeout(Duration.ofSeconds(30));
    }
}

4. Объединение нескольких асинхронных операций

@Service
public class CompositeService {
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private NotificationService notificationService;
    
    // Параллельно выполняем несколько операций
    public Mono<UserProfile> getUserProfile(String userId) {
        Mono<User> userMono = Mono.fromCallable(() -> 
            userRepository.findById(userId))
            .subscribeOn(Schedulers.boundedElastic());
            
        Mono<List<Order>> ordersMono = Mono.fromCallable(() -> 
            orderRepository.findByUserId(userId))
            .subscribeOn(Schedulers.boundedElastic());
        
        // Комбинируем с zip
        return Mono.zip(userMono, ordersMono)
            .map(tuple -> new UserProfile(
                tuple.getT1(),
                tuple.getT2()
            ));
    }
    
    // Или используйте flatMap для последовательных зависимостей
    public Mono<Void> processUserOrder(String userId, String orderId) {
        return Mono.fromCallable(() -> 
            userRepository.findById(userId))
            .subscribeOn(Schedulers.boundedElastic())
            .flatMap(user -> 
                Mono.fromCallable(() -> 
                    orderRepository.findById(orderId))
                .subscribeOn(Schedulers.boundedElastic())
                .map(order -> new UserOrder(user, order))
            )
            .flatMap(userOrder -> 
                notificationService.sendNotification(userOrder)
            )
            .then();  // Игнорируем результат
    }
}

5. Обработка ошибок без потери реактивности

@RestController
public class ErrorHandlingController {
    @Autowired
    private UserService userService;
    
    @GetMapping("/users/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.findById(id)
            .map(ResponseEntity::ok)
            .doOnError(UserNotFoundException.class, e -> 
                log.warn("User not found: {}", id))
            .onErrorResume(UserNotFoundException.class, e -> 
                Mono.just(ResponseEntity.notFound().build()))
            .onErrorResume(Exception.class, e -> {
                log.error("Unexpected error", e);
                return Mono.just(ResponseEntity.status(500).build());
            });
    }
    
    @GetMapping("/users")
    public Flux<User> getAllUsers() {
        return userService.findAll()
            .doOnError(e -> log.error("Stream error", e))
            .onErrorResume(e -> Flux.empty())  // Graceful fallback
            .retry(2)  // Повторить 2 раза при ошибке
            .timeout(Duration.ofSeconds(30));
    }
}

6. Избегайте обычных ошибок

@Service
public class ReactiveAntipatternsDemo {
    
    // НЕПРАВИЛЬНО! Это блокирует event loop!
    public Mono<String> blockingGet() {
        return Mono.just(
            userRepository.findById("123").block()  // ❌ БЛОКИРУЕТ!
        );
    }
    
    // ПРАВИЛЬНО
    public Mono<String> nonBlockingGet() {
        return Mono.fromCallable(() -> 
            userRepository.findById("123"))
            .subscribeOn(Schedulers.boundedElastic());
    }
    
    // НЕПРАВИЛЬНО! Синхронизация в реактивном коде
    public Mono<String> synchronizedWrong() {
        return Mono.just(System.currentTimeMillis())
            .doOnNext(time -> {
                synchronized (this) {  // ❌ Deadlock risk!
                    // Do something
                }
            });
    }
    
    // ПРАВИЛЬНО — используйте Schedulers для синхронизации
    public Mono<String> synchronizedRight() {
        return Mono.just(System.currentTimeMillis())
            .publishOn(Schedulers.single())  // Сериализуем на одном потоке
            .doOnNext(time -> {
                // Теперь безопасно
            });
    }
}

7. Backpressure — критично для Netty

@RestController
public class BackpressureController {
    @Autowired
    private DataService dataService;
    
    @GetMapping("/stream")
    public Flux<ServerSentEvent<String>> streamData() {
        return dataService.generateLargeStream()
            .buffer(100)  // Буферизуем по 100 элементов
            .flatMap(batch -> {
                // Обрабатываем батчами
                return Mono.just(batch);
            })
            .map(data -> ServerSentEvent.builder()
                .data(data)
                .build())
            .doOnCancel(() -> log.info("Stream cancelled")
            // Netty автоматически обрабатывает backpressure
    }
}

8. Тестирование реактивного кода

@SpringBootTest
public class ReactiveServiceTest {
    @Autowired
    private UserService userService;
    
    @Test
    public void testReactiveFlow() {
        // StepVerifier гарантирует, что всё реактивно
        StepVerifier.create(
            userService.findById("123")
        )
        .expectNextMatches(user -> user.getId().equals("123"))
        .expectComplete()
        .verify(Duration.ofSeconds(5));
    }
}

Итого: Как избежать потери реактивности

  1. Не блокируйте event loop — используйте subscribeOn()
  2. Выбирайте правильный schedulerboundedElastic() для IO
  3. Оборачивайте блокирующий код в Mono.fromCallable()
  4. Используйте flatMap для зависимостей вместо вложенности
  5. Обрабатывайте backpressure — Netty это делает автоматически
  6. Никогда не вызывайте .block() в реактивном коде
  7. Тестируйте с StepVerifier для гарантии реактивности

Нетти и реактивные стеки идеально работают вместе при правильном использовании schedulers.