← Назад к вопросам

Какие основные объекты используются в Project Reactor

2.4 Senior🔥 91 комментариев
#Spring Framework#Stream API и функциональное программирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Project Reactor: Основные объекты

Project Reactor — это библиотека для создания реактивных приложений на Java с использованием функционального стиля программирования. Она реализует спецификацию Reactive Streams и является основой для Spring WebFlux. Давайте разберём ключевые компоненты и объекты.

1. Mono — Реактивный контейнер для 0 или 1 элемента

Моно используется когда операция возвращает не более одного результата:

// Создание Mono
Mono<String> mono = Mono.just("Hello World");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Error"));
Mono<Integer> deferred = Mono.defer(() -> Mono.just(42));

// Подписка и обработка
mono.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed")
);

// Трансформация значения
Mono<Integer> length = mono.map(s -> s.length());

// Асинхронная операция
Mono<User> user = Mono.fromCallable(() -> getUserFromDB(1))
    .subscribeOn(Schedulers.boundedElastic());

// Обработка ошибок
mono.onErrorReturn("Default value")
    .onErrorResume(e -> Mono.just("Fallback"))
    .doOnError(e -> log.error("Error occurred", e));

// Таймауты
mono.timeout(Duration.ofSeconds(5))
    .onErrorResume(e -> Mono.just("Timeout"));

Когда использовать:

  • HTTP GET запрос для получения одного ресурса
  • Поиск пользователя по ID
  • Единственный результат из БД
  • Асинхронная операция с одним результатом

2. Flux — Реактивный контейнер для множества элементов

Flux используется когда нужно обработать поток данных:

// Создание Flux
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
Flux<String> range = Flux.range(1, 5).map(i -> "Item " + i);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
Flux<String> iterable = Flux.fromIterable(Arrays.asList("a", "b", "c"));

// Подписка
flux.subscribe(
    value -> System.out.println(value),
    error -> System.err.println(error),
    () -> System.out.println("Stream completed")
);

// Фильтрация
Flux<Integer> filtered = flux.filter(x -> x > 2);

// Преобразование (map)
Flux<String> mapped = flux.map(x -> "Number: " + x);

// FlatMap для асинхронных операций
Flux<User> users = Flux.just(1, 2, 3)
    .flatMap(id -> getUserAsync(id));  // Асинхронный поиск

// Ограничение потока
Flux<Integer> limited = flux.take(3);  // Первые 3 элемента
Flux<Integer> skip = flux.skip(2);     // Пропустить 2
Flux<Integer> batch = flux.buffer(2);  // Группировка по 2

// Обработка ошибок в потоке
flux.onErrorContinue((error, item) -> 
    log.warn("Error processing item {}: {}", item, error.getMessage())
);

// Retry политика
flux.retry(3)
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));

Когда использовать:

  • Потоковое чтение больших файлов
  • Получение списка записей с БД
  • WebSocket сообщения
  • Event streaming
  • Обработка множественных HTTP запросов

3. Publisher и Subscriber (Reactive Streams)

Фундаментальные интерфейсы реактивности:

// Publisher - источник данных
Publisher<String> publisher = Flux.just("A", "B", "C");

// Subscriber - потребитель данных
Subscriber<String> subscriber = new Subscriber<String>() {
    private Subscription subscription;
    
    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        subscription.request(Long.MAX_VALUE);  // Запросить все
    }
    
    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
    }
    
    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable);
    }
    
    @Override
    public void onComplete() {
        System.out.println("Stream completed");
    }
};

publisher.subscribe(subscriber);

4. Subscription — Управление подпиской

// Контроль потока данных (backpressure)
Flux<Integer> flux = Flux.range(1, 1000);

flux.subscribe(new Subscriber<Integer>() {
    private Subscription subscription;
    private int count = 0;
    
    @Override
    public void onSubscribe(Subscription s) {
        this.subscription = s;
        subscription.request(10);  // Запросить первые 10
    }
    
    @Override
    public void onNext(Integer item) {
        count++;
        if (count % 10 == 0) {
            subscription.request(10);  // Запросить ещё 10
        }
    }
    
    @Override
    public void onError(Throwable t) {}
    
    @Override
    public void onComplete() {}
});

// В реактивном коде обычно используется Flux/Mono
flux.limitRate(10)  // Запросить по 10 элементов
    .subscribe(System.out::println);

5. Scheduler — Планирование выполнения

Определяет на каком потоке будет выполняться код:

// Различные scheduler'ы
Schedulers.immediate();           // Текущий поток (default)
Schedulers.single();              // Один повторно используемый поток
Schedulers.parallel();            // N потоков для параллельной обработки
Schedulers.boundedElastic();      // Для I/O операций
Schedulers.fromExecutorService(); // Custom ExecutorService

// Использование
Flux<Integer> flux = Flux.range(1, 5)
    .map(x -> {
        System.out.println(Thread.currentThread().getName());
        return x * 2;
    })
    .publishOn(Schedulers.parallel())  // Переключение потока
    .map(x -> {
        System.out.println(Thread.currentThread().getName());
        return x + 1;
    });

// subscribeOn - влияет на источник данных
// publishOn - влияет на downstreamные операции
Mono<User> user = getUserFromDB()
    .subscribeOn(Schedulers.boundedElastic());  // Запрос к БД в отдельном потоке

6. StepVerifier — Тестирование реактивных потоков

// Тестирование Mono
@Test
public void testMonoWithValue() {
    Mono<String> mono = Mono.just("Hello");
    
    StepVerifier.create(mono)
        .expectNext("Hello")
        .expectComplete()
        .verify();
}

// Тестирование Flux
@Test
public void testFlux() {
    Flux<Integer> flux = Flux.just(1, 2, 3);
    
    StepVerifier.create(flux)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectComplete()
        .verify();
}

// Тестирование ошибок
@Test
public void testError() {
    Mono<String> mono = Mono.error(new RuntimeException("Error"));
    
    StepVerifier.create(mono)
        .expectError(RuntimeException.class)
        .verify();
}

// Тестирование с виртуальным временем
@Test
public void testInterval() {
    Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(3);
    
    StepVerifier.withVirtualTime(() -> flux)
        .expectSubscription()
        .thenAwait(Duration.ofSeconds(3))
        .expectNext(0L, 1L, 2L)
        .expectComplete()
        .verify();
}

7. Создание обычного Publisher'а

// Создание Flux из callback'а
Flux<String> create = Flux.create(sink -> {
    sink.next("Item 1");
    sink.next("Item 2");
    sink.complete();
});

// Для асинхронных источников
Flux<String> push = Flux.push(sink -> {
    executorService.submit(() -> {
        sink.next("Async item");
        sink.complete();
    });
});

// Из Callable
Mono<String> callable = Mono.fromCallable(() -> {
    Thread.sleep(1000);
    return "Delayed result";
});

// Из Future
Mono<String> future = Mono.fromFuture(
    CompletableFuture.supplyAsync(() -> "Result")
);

// Из Optional
Mono<String> optional = Mono.justOrEmpty(Optional.of("Value"));

8. Комбинирование потоков

// Merge - слияние потоков
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> merged = Flux.merge(flux1, flux2);

// Concat - последовательная обработка
Flux<Integer> concatenated = Flux.concat(flux1, flux2);

// ZipWith - координация двух потоков
Flux<String> flux3 = Flux.just("a", "b", "c");
Flux<String> zipped = flux1.zipWith(flux3, 
    (num, str) -> num + str);

// CombineLatest - последнее значение из каждого
Mono<Integer> mono1 = Mono.just(10);
Mono<String> mono2 = Mono.just("Hello");
Mono<String> combined = Mono.combineLatest(
    mono1, mono2,
    (num, str) -> num + ": " + str
);

9. Практический пример: REST API с Spring WebFlux

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private final UserService userService;
    
    // GET /api/users/:id - возвращает Mono
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return userService.getUserById(id);
    }
    
    // GET /api/users - возвращает Flux
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.getAllUsers();
    }
    
    // POST /api/users - асинхронное создание
    @PostMapping
    public Mono<User> createUser(@RequestBody User user) {
        return userService.createUser(user);
    }
}

@Service
public class UserService {
    
    private final UserRepository userRepository;
    private final WebClient webClient;
    
    public Mono<User> getUserById(Long id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(
                new ResponseStatusException(HttpStatus.NOT_FOUND)
            ));
    }
    
    public Flux<User> getAllUsers() {
        return userRepository.findAll()
            .delayElement(Duration.ofMillis(100));  // Сдвиг для демо
    }
    
    public Mono<User> createUser(User user) {
        return userRepository.save(user)
            .doOnNext(savedUser -> log.info("User created: {}", savedUser.getId()))
            .doOnError(e -> log.error("Error creating user", e));
    }
}

10. Основные операторы

ОператорНазначениеПример
mapПреобразование элементаflux.map(x -> x * 2)
flatMapАсинхронное преобразованиеflux.flatMap(id -> getUserAsync(id))
filterФильтрация по условиюflux.filter(x -> x > 5)
takeОграничение количестваflux.take(10)
skipПропуск элементовflux.skip(5)
bufferГруппировка элементовflux.buffer(10)
reduceАгрегацияflux.reduce((a, b) -> a + b)
distinctУникальные элементыflux.distinct()
timeoutТаймаутmono.timeout(Duration.ofSeconds(5))
retryПовторные попыткиflux.retry(3)
mergeОбъединение потоковFlux.merge(flux1, flux2)
zipСинхронизация потоковflux1.zipWith(flux2)

Заключение

Project Reactor обеспечивает:

  • Неблокирующее выполнение — поток не занят во время ожидания
  • Функциональное программирование — удобная композиция операций
  • Backpressure — контроль потока данных
  • Scalability — обработка большого количества соединений
  • Spring интеграция — WebFlux, Data R2DBC, Cloud

Это мощный инструмент для построения высокопроизводительных реактивных систем на Java.

Какие основные объекты используются в Project Reactor | PrepBro