Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Mono в Project Reactor (Reactive Stream)
Определение
Mono — это одна из основных абстракций в Project Reactor (реактивной библиотеке для Java). Mono представляет асинхронный поток данных, который может содержать 0 или 1 элемент.
Mono используется в реактивном программировании для обработки:
- Одного значения (как Future)
- Отсутствия значения (пустой результат)
- Ошибки вместо значения
Mono похож на:
- Future<T> — но с более мощным API
- Optional<T> — но асинхронный
- CompletableFuture<T> — но реактивный
Flux vs Mono
В Project Reactor есть две основные абстракции:
Flux<T> — поток 0..∞ элементов
Flux:
timeline ─────A────B────C────D────|
(0 or more items)
Mono<T> — поток 0..1 элемента
Mono:
timeline ─────A────|
(0 or 1 item)
Создание Mono
1. Mono с значением
// Mono с одним значением
Mono<String> mono = Mono.just("Hello");
// Mono пустой (no value)
Mono<String> empty = Mono.empty();
// Mono с ошибкой
Mono<String> error = Mono.error(new Exception("Something went wrong"));
// Mono из Callable
Mono<Integer> computed = Mono.fromCallable(() -> {
return 42;
});
// Mono из Future
CompletableFuture<String> future = new CompletableFuture<>();
Mono<String> fromFuture = Mono.fromFuture(future);
// Mono отложенного вычисления
Mono<String> delayed = Mono.defer(() -> Mono.just("Computed later"));
2. Mono с задержкой
// Значение через 2 секунды
Mono<String> delayed = Mono.just("Hello")
.delayElement(Duration.ofSeconds(2));
// Пустой Mono через 1 секунду
Mono<Void> timer = Mono.delay(Duration.ofSeconds(1));
Обработка результатов Mono
Подписка на Mono:
Mono<String> mono = Mono.just("Hello, World!");
// Базовая подписка
mono.subscribe(
value -> System.out.println("Value: " + value), // onNext
error -> System.err.println("Error: " + error.getMessage()), // onError
() -> System.out.println("Completed") // onComplete
);
// Только значение
mono.subscribe(value -> System.out.println("Value: " + value));
// С обработкой ошибок
mono.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Error: " + error)
);
Трансформация Mono
map — преобразование значения
Mono<Integer> mono = Mono.just("123");
Mono<Integer> mapped = mono
.map(str -> Integer.parseInt(str))
.map(num -> num * 2);
mapped.subscribe(System.out::println); // 246
flatMap — асинхронная трансформация
Mono<String> userMono = Mono.just("alice");
// Получаем ID пользователя асинхронно
Mono<Integer> userIdMono = userMono
.flatMap(username -> {
// Этот запрос выполняется асинхронно
return getUserId(username);
});
// Т.е. flatMap преобразует Mono<String> в Mono<Integer>
// а не в Mono<Mono<Integer>>
private Mono<Integer> getUserId(String username) {
return Mono.just(username.hashCode());
}
filter — условная фильтрация
Mono<Integer> mono = Mono.just(42);
Mono<Integer> filtered = mono
.filter(num -> num > 40)
.switchIfEmpty(Mono.just(-1));
filtered.subscribe(System.out::println); // 42
Обработка ошибок в Mono
1. onErrorReturn — возврат значения при ошибке
Mono<String> risky = Mono.error(new RuntimeException("DB error"));
Mono<String> withFallback = risky
.onErrorReturn("Fallback value");
withFallback.subscribe(System.out::println); // "Fallback value"
2. onErrorResume — асинхронное восстановление
Mono<User> fetchUser(String id) {
return userRepository.findById(id)
.onErrorResume(ex -> {
// При ошибке запрашиваем из кэша
return cacheRepository.findById(id);
});
}
3. retry — повторная попытка
Mono<String> unreliable = Mono.defer(() -> {
if (Math.random() < 0.7) {
return Mono.error(new RuntimeException("Random failure"));
}
return Mono.just("Success");
});
Mono<String> withRetry = unreliable
.retry(3) // Повторяем максимум 3 раза
.onErrorReturn("All retries failed");
Практический пример: HTTP запрос
// Spring WebClient использует Mono
@Component
public class UserService {
private final WebClient webClient;
public UserService(WebClient.Builder builder) {
this.webClient = builder.baseUrl("https://api.example.com").build();
}
// Возвращает Mono с пользователем или пустой результат
public Mono<User> getUserById(String id) {
return webClient
.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class)
.doOnSuccess(user -> System.out.println("User loaded: " + user))
.doOnError(ex -> System.err.println("Error: " + ex))
.onErrorResume(ex -> Mono.empty());
}
}
// Использование
@RestController
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/users/{id}")
public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
return userService.getUserById(id)
.map(user -> ResponseEntity.ok(user))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
Блокирование Mono (когда необходимо)
// Получить значение синхронно (блокирующий вызов)
Mono<String> mono = Mono.just("Hello");
String value = mono.block(); // Ждём результат
// Получить с timeout
String valueWithTimeout = mono.block(Duration.ofSeconds(5));
// Получить Optional
Optional<String> optional = mono.blockOptional();
// ВАЖНО: Избегайте block() в реактивном коде!
// Используйте только в точках входа (main, test)
Комбинирование нескольких Mono
zip — комбинирование результатов
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Mono<String> combined = Mono.zip(mono1, mono2)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
combined.subscribe(System.out::println); // "Hello World"
then — последовательное выполнение
Mono<Void> operation1 = saveUserMono;
Mono<Void> operation2 = sendEmailMono;
operation1
.then(operation2)
.then(Mono.just("All done"))
.subscribe(System.out::println);
Практический паттерн: Request-Reply
// Симуляция HTTP запроса
Mono<String> fetchDataFromAPI(String url) {
return Mono
.defer(() -> {
System.out.println("Fetching: " + url);
return Mono.just("Response data");
})
.delayElement(Duration.ofMillis(100))
.doOnSuccess(data -> System.out.println("Success: " + data))
.doOnError(ex -> System.err.println("Failed: " + ex))
.onErrorReturn("Default data")
.timeout(Duration.ofSeconds(5));
}
// Использование
fetchDataFromAPI("https://api.example.com/data")
.flatMap(data -> processData(data))
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error),
() -> System.out.println("Done")
);
Ключевые выводы
- Mono — для одного значения — используйте вместо Future/Optional
- Асинхронность — операции не блокируют поток
- Лаванная обработка — map, flatMap, filter для трансформации
- Error handling — onErrorReturn, onErrorResume, retry встроены
- Не блокируйте — избегайте block() в реактивном коде
- Комбинируйте — zip, then для комбинирования операций
Mono — это основа реактивного программирования в Java, позволяет писать non-blocking, асинхронный код, который эффективнее использует ресурсы.