Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Project Reactor: Реактивное программирование в Java
Project Reactor — это библиотека для реактивного программирования на Java. Она предоставляет реализацию Reactive Streams спецификации и помогает писать асинхронный, неблокирующий код для обработки потоков данных.
Основные понятия
Реактивное программирование
Реактивное программирование — это парадигма, где приложение реагирует на события и асинхронно обрабатывает потоки данных.
// Традиционный подход (синхронный, блокирующий)
Request → Process → Response
(ждём результат)
// Реактивный подход (асинхронный, неблокирующий)
Request → async operation → Response (когда будет готово)
(код продолжает выполняться)
Основные компоненты Reactor
1. Flux - поток нескольких элементов
import reactor.core.publisher.Flux;
// Flux может выдавать 0, 1, несколько или бесконечное количество элементов
Flux<String> stringFlux = Flux.just("Apple", "Banana", "Orange");
stringFlux.subscribe(
item -> System.out.println("Item: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Stream completed") // onComplete
);
// Вывод:
// Item: Apple
// Item: Banana
// Item: Orange
// Stream completed
2. Mono - поток с 0 или 1 элементом
import reactor.core.publisher.Mono;
// Mono часто используется для одиночных значений
Mono<String> monoHello = Mono.just("Hello");
monoHello.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
// Пустой Mono
Mono<String> emptyMono = Mono.empty();
Создание потоков данных
// Flux из коллекции
Flux<Integer> numbers = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5));
// Flux из массива
Flux<String> flux = Flux.fromArray(new String[]{"a", "b", "c"});
// Flux с диапазоном
Flux<Integer> range = Flux.range(1, 10);
// Flux с интервалом
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
// Flux с ошибкой
Flux<String> errorFlux = Flux.error(new RuntimeException("Error"));
// Пустой Flux
Flux<String> emptyFlux = Flux.empty();
// Mono из Optional
Mono<String> mono = Mono.justOrEmpty(Optional.of("Hello"));
Операции преобразования (Transformation)
map - преобразование каждого элемента
Flux<String> fruits = Flux.just("apple", "banana", "orange");
Flux<String> uppercase = fruits.map(String::toUpperCase);
uppercase.subscribe(System.out::println);
// APPLE
// BANANA
// ORANGE
flatMap - преобразование в поток потоков
Flux<String> users = Flux.just("alice", "bob", "charlie");
Flux<String> result = users.flatMap(user ->
getUserOrders(user) // Возвращает Flux<Order>
.map(order -> user + ": " + order)
);
// flatMap асинхронно обрабатывает результаты
filter - фильтрация элементов
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
evenNumbers.subscribe(System.out::println);
// 2, 4, 6, 8, 10
reduce - агрегация
Flux<Integer> numbers = Flux.range(1, 5);
Mono<Integer> sum = numbers.reduce(0, (a, b) -> a + b);
sum.subscribe(System.out::println);
// 15
Обработка ошибок
Flux<Integer> numbers = Flux.just(1, 2, 0, 4);
Flux<String> result = numbers
.map(n -> "Result: " + (10 / n)) // Может выбросить исключение
.onErrorReturn("Error occurred") // Обработка ошибки
.doOnError(error -> logger.error("Error: ", error));
result.subscribe(System.out::println);
// Result: 10
// Result: 5
// Error occurred
Практический пример: Fetch данных с сервера
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClient;
@Service
public class UserService {
private WebClient webClient;
// Получить пользователя асинхронно
public Mono<User> getUserById(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.timeout(Duration.ofSeconds(5))
.onErrorResume(error -> {
logger.error("Error fetching user", error);
return Mono.empty();
});
}
// Получить список пользователей
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
}
Spring WebFlux - реактивный веб фреймворк
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/api/users")
public class UserController {
private UserService userService;
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.getUserById(id);
}
@GetMapping
public Flux<User> getAllUsers() {
return userService.getAllUsers();
}
@PostMapping
public Mono<User> createUser(@RequestBody User user) {
return userService.saveUser(user);
}
}
Продвинутые операции
zip - объединение потоков
Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);
Flux<String> combined = Flux.zip(names, ages)
.map(tuple -> tuple.getT1() + ": " + tuple.getT2());
combined.subscribe(System.out::println);
// Alice: 25
// Bob: 30
// Charlie: 35
merge - объединение данных
Flux<String> flux1 = Flux.just("a", "b");
Flux<String> flux2 = Flux.just("c", "d");
Flux<String> merged = Flux.merge(flux1, flux2);
merged.subscribe(System.out::println);
// a, b, c, d (порядок может быть другим)
delay - задержка
Flux<String> delayed = Flux.just("Hello")
.delayElements(Duration.ofSeconds(1));
delayed.subscribe(System.out::println);
// (пауза 1 сек) Hello
Backpressure - контроль скорости
Flux<Integer> numbers = Flux.range(1, 100);
numbers
.subscribe(
item -> System.out.println(item),
error -> System.err.println(error),
() -> System.out.println("Done"),
subscription -> subscription.request(10) // Запрос 10 элементов
);
Scheduler - выполнение на разных потоках
import reactor.core.scheduler.Schedulers;
Flux<Integer> numbers = Flux.range(1, 5);
numbers
.publishOn(Schedulers.parallel()) // Выполнить на параллельном потоке
.map(n -> {
System.out.println(Thread.currentThread().getName() + ": " + n);
return n * 2;
})
.subscribeOn(Schedulers.boundedElastic()) // Подписаться на elastic потоке
.subscribe(System.out::println);
Сравнение: traditionalвый vs реактивный код
// ТРАДИЦИОННЫЙ (блокирующий)
@GetMapping("/users/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id) {
User user = userRepository.findById(id).orElseThrow();
user.setOrders(orderService.getOrders(id)); // Ждём результата
return ResponseEntity.ok(user);
}
// РЕАКТИВНЫЙ (асинхронный)
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new NotFoundException()))
.flatMap(user -> orderService.getOrders(id)
.collectList()
.map(orders -> {
user.setOrders(orders);
return user;
})
);
}
Тестирование с Project Reactor
import reactor.test.StepVerifier;
@Test
public void testFlux() {
Flux<String> flux = Flux.just("a", "b", "c");
StepVerifier.create(flux)
.expectNext("a")
.expectNext("b")
.expectNext("c")
.expectComplete()
.verify();
}
@Test
public void testMono() {
Mono<String> mono = Mono.just("Hello");
StepVerifier.create(mono)
.expectNext("Hello")
.expectComplete()
.verify();
}
Когда использовать Project Reactor?
✅ Использовать:
- Высоконагруженные приложения
- Асинхронная обработка данных
- Работа с внешними API
- Потокобезопасность важна
- Spring WebFlux приложения
❌ НЕ использовать:
- Простые приложения с невысокой нагрузкой
- Когда нужна простота кода
- Если команда незнакома с реактивностью
Итого
- Project Reactor — реактивная библиотека для Java
- Flux — поток 0+ элементов
- Mono — поток 0-1 элементов
- Асинхронный, неблокирующий код — лучше использует ресурсы
- Spring WebFlux — реактивный веб фреймворк на основе Reactor
- Нужна практика — реактивный код требует переосмысления