← Назад к вопросам
Какие основные объекты используются в Project Reactor
2.4 Senior🔥 91 комментариев
#Spring Framework#Stream API и функциональное программирование
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Project Reactor: Основные объекты
Project Reactor — это библиотека для создания реактивных приложений на Java с использованием функционального стиля программирования. Она реализует спецификацию Reactive Streams и является основой для Spring WebFlux. Давайте разберём ключевые компоненты и объекты.
1. Mono — Реактивный контейнер для 0 или 1 элемента
Моно используется когда операция возвращает не более одного результата:
// Создание Mono
Mono<String> mono = Mono.just("Hello World");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));
Mono<Integer> deferred = Mono.defer(() -> Mono.just(42));
// Подписка и обработка
mono.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
// Трансформация значения
Mono<Integer> length = mono.map(s -> s.length());
// Асинхронная операция
Mono<User> user = Mono.fromCallable(() -> getUserFromDB(1))
.subscribeOn(Schedulers.boundedElastic());
// Обработка ошибок
mono.onErrorReturn("Default value")
.onErrorResume(e -> Mono.just("Fallback"))
.doOnError(e -> log.error("Error occurred", e));
// Таймауты
mono.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.just("Timeout"));
Когда использовать:
- HTTP GET запрос для получения одного ресурса
- Поиск пользователя по ID
- Единственный результат из БД
- Асинхронная операция с одним результатом
2. Flux — Реактивный контейнер для множества элементов
Flux используется когда нужно обработать поток данных:
// Создание Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<String> range = Flux.range(1, 5).map(i -> "Item " + i);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<String> iterable = Flux.fromIterable(Arrays.asList("a", "b", "c"));
// Подписка
flux.subscribe(
value -> System.out.println(value),
error -> System.err.println(error),
() -> System.out.println("Stream completed")
);
// Фильтрация
Flux<Integer> filtered = flux.filter(x -> x > 2);
// Преобразование (map)
Flux<String> mapped = flux.map(x -> "Number: " + x);
// FlatMap для асинхронных операций
Flux<User> users = Flux.just(1, 2, 3)
.flatMap(id -> getUserAsync(id)); // Асинхронный поиск
// Ограничение потока
Flux<Integer> limited = flux.take(3); // Первые 3 элемента
Flux<Integer> skip = flux.skip(2); // Пропустить 2
Flux<Integer> batch = flux.buffer(2); // Группировка по 2
// Обработка ошибок в потоке
flux.onErrorContinue((error, item) ->
log.warn("Error processing item {}: {}", item, error.getMessage())
);
// Retry политика
flux.retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
Когда использовать:
- Потоковое чтение больших файлов
- Получение списка записей с БД
- WebSocket сообщения
- Event streaming
- Обработка множественных HTTP запросов
3. Publisher и Subscriber (Reactive Streams)
Фундаментальные интерфейсы реактивности:
// Publisher - источник данных
Publisher<String> publisher = Flux.just("A", "B", "C");
// Subscriber - потребитель данных
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(Long.MAX_VALUE); // Запросить все
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable);
}
@Override
public void onComplete() {
System.out.println("Stream completed");
}
};
publisher.subscribe(subscriber);
4. Subscription — Управление подпиской
// Контроль потока данных (backpressure)
Flux<Integer> flux = Flux.range(1, 1000);
flux.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
private int count = 0;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(10); // Запросить первые 10
}
@Override
public void onNext(Integer item) {
count++;
if (count % 10 == 0) {
subscription.request(10); // Запросить ещё 10
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
// В реактивном коде обычно используется Flux/Mono
flux.limitRate(10) // Запросить по 10 элементов
.subscribe(System.out::println);
5. Scheduler — Планирование выполнения
Определяет на каком потоке будет выполняться код:
// Различные scheduler'ы
Schedulers.immediate(); // Текущий поток (default)
Schedulers.single(); // Один повторно используемый поток
Schedulers.parallel(); // N потоков для параллельной обработки
Schedulers.boundedElastic(); // Для I/O операций
Schedulers.fromExecutorService(); // Custom ExecutorService
// Использование
Flux<Integer> flux = Flux.range(1, 5)
.map(x -> {
System.out.println(Thread.currentThread().getName());
return x * 2;
})
.publishOn(Schedulers.parallel()) // Переключение потока
.map(x -> {
System.out.println(Thread.currentThread().getName());
return x + 1;
});
// subscribeOn - влияет на источник данных
// publishOn - влияет на downstreamные операции
Mono<User> user = getUserFromDB()
.subscribeOn(Schedulers.boundedElastic()); // Запрос к БД в отдельном потоке
6. StepVerifier — Тестирование реактивных потоков
// Тестирование Mono
@Test
public void testMonoWithValue() {
Mono<String> mono = Mono.just("Hello");
StepVerifier.create(mono)
.expectNext("Hello")
.expectComplete()
.verify();
}
// Тестирование Flux
@Test
public void testFlux() {
Flux<Integer> flux = Flux.just(1, 2, 3);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectComplete()
.verify();
}
// Тестирование ошибок
@Test
public void testError() {
Mono<String> mono = Mono.error(new RuntimeException("Error"));
StepVerifier.create(mono)
.expectError(RuntimeException.class)
.verify();
}
// Тестирование с виртуальным временем
@Test
public void testInterval() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(3);
StepVerifier.withVirtualTime(() -> flux)
.expectSubscription()
.thenAwait(Duration.ofSeconds(3))
.expectNext(0L, 1L, 2L)
.expectComplete()
.verify();
}
7. Создание обычного Publisher'а
// Создание Flux из callback'а
Flux<String> create = Flux.create(sink -> {
sink.next("Item 1");
sink.next("Item 2");
sink.complete();
});
// Для асинхронных источников
Flux<String> push = Flux.push(sink -> {
executorService.submit(() -> {
sink.next("Async item");
sink.complete();
});
});
// Из Callable
Mono<String> callable = Mono.fromCallable(() -> {
Thread.sleep(1000);
return "Delayed result";
});
// Из Future
Mono<String> future = Mono.fromFuture(
CompletableFuture.supplyAsync(() -> "Result")
);
// Из Optional
Mono<String> optional = Mono.justOrEmpty(Optional.of("Value"));
8. Комбинирование потоков
// Merge - слияние потоков
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> merged = Flux.merge(flux1, flux2);
// Concat - последовательная обработка
Flux<Integer> concatenated = Flux.concat(flux1, flux2);
// ZipWith - координация двух потоков
Flux<String> flux3 = Flux.just("a", "b", "c");
Flux<String> zipped = flux1.zipWith(flux3,
(num, str) -> num + str);
// CombineLatest - последнее значение из каждого
Mono<Integer> mono1 = Mono.just(10);
Mono<String> mono2 = Mono.just("Hello");
Mono<String> combined = Mono.combineLatest(
mono1, mono2,
(num, str) -> num + ": " + str
);
9. Практический пример: REST API с Spring WebFlux
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
// GET /api/users/:id - возвращает Mono
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.getUserById(id);
}
// GET /api/users - возвращает Flux
@GetMapping
public Flux<User> getAllUsers() {
return userService.getAllUsers();
}
// POST /api/users - асинхронное создание
@PostMapping
public Mono<User> createUser(@RequestBody User user) {
return userService.createUser(user);
}
}
@Service
public class UserService {
private final UserRepository userRepository;
private final WebClient webClient;
public Mono<User> getUserById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(
new ResponseStatusException(HttpStatus.NOT_FOUND)
));
}
public Flux<User> getAllUsers() {
return userRepository.findAll()
.delayElement(Duration.ofMillis(100)); // Сдвиг для демо
}
public Mono<User> createUser(User user) {
return userRepository.save(user)
.doOnNext(savedUser -> log.info("User created: {}", savedUser.getId()))
.doOnError(e -> log.error("Error creating user", e));
}
}
10. Основные операторы
| Оператор | Назначение | Пример |
|---|---|---|
map | Преобразование элемента | flux.map(x -> x * 2) |
flatMap | Асинхронное преобразование | flux.flatMap(id -> getUserAsync(id)) |
filter | Фильтрация по условию | flux.filter(x -> x > 5) |
take | Ограничение количества | flux.take(10) |
skip | Пропуск элементов | flux.skip(5) |
buffer | Группировка элементов | flux.buffer(10) |
reduce | Агрегация | flux.reduce((a, b) -> a + b) |
distinct | Уникальные элементы | flux.distinct() |
timeout | Таймаут | mono.timeout(Duration.ofSeconds(5)) |
retry | Повторные попытки | flux.retry(3) |
merge | Объединение потоков | Flux.merge(flux1, flux2) |
zip | Синхронизация потоков | flux1.zipWith(flux2) |
Заключение
Project Reactor обеспечивает:
- Неблокирующее выполнение — поток не занят во время ожидания
- Функциональное программирование — удобная композиция операций
- Backpressure — контроль потока данных
- Scalability — обработка большого количества соединений
- Spring интеграция — WebFlux, Data R2DBC, Cloud
Это мощный инструмент для построения высокопроизводительных реактивных систем на Java.