← Назад к вопросам

Что такое Project Reactor в Java?

3.0 Senior🔥 201 комментариев
#Spring Framework

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Project Reactor: Реактивное программирование в Java

Project Reactor — это библиотека для реактивного программирования на Java. Она предоставляет реализацию Reactive Streams спецификации и помогает писать асинхронный, неблокирующий код для обработки потоков данных.

Основные понятия

Реактивное программирование

Реактивное программирование — это парадигма, где приложение реагирует на события и асинхронно обрабатывает потоки данных.

// Традиционный подход (синхронный, блокирующий)
Request → Process → Response
(ждём результат)

// Реактивный подход (асинхронный, неблокирующий)
Request → async operation → Response (когда будет готово)
(код продолжает выполняться)

Основные компоненты Reactor

1. Flux - поток нескольких элементов

import reactor.core.publisher.Flux;

// Flux может выдавать 0, 1, несколько или бесконечное количество элементов
Flux<String> stringFlux = Flux.just("Apple", "Banana", "Orange");

stringFlux.subscribe(
    item -> System.out.println("Item: " + item),           // onNext
    error -> System.err.println("Error: " + error),      // onError
    () -> System.out.println("Stream completed")          // onComplete
);

// Вывод:
// Item: Apple
// Item: Banana
// Item: Orange
// Stream completed

2. Mono - поток с 0 или 1 элементом

import reactor.core.publisher.Mono;

// Mono часто используется для одиночных значений
Mono<String> monoHello = Mono.just("Hello");

monoHello.subscribe(
    item -> System.out.println("Item: " + item),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed")
);

// Пустой Mono
Mono<String> emptyMono = Mono.empty();

Создание потоков данных

// Flux из коллекции
Flux<Integer> numbers = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5));

// Flux из массива
Flux<String> flux = Flux.fromArray(new String[]{"a", "b", "c"});

// Flux с диапазоном
Flux<Integer> range = Flux.range(1, 10);

// Flux с интервалом
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));

// Flux с ошибкой
Flux<String> errorFlux = Flux.error(new RuntimeException("Error"));

// Пустой Flux
Flux<String> emptyFlux = Flux.empty();

// Mono из Optional
Mono<String> mono = Mono.justOrEmpty(Optional.of("Hello"));

Операции преобразования (Transformation)

map - преобразование каждого элемента

Flux<String> fruits = Flux.just("apple", "banana", "orange");

Flux<String> uppercase = fruits.map(String::toUpperCase);

uppercase.subscribe(System.out::println);
// APPLE
// BANANA
// ORANGE

flatMap - преобразование в поток потоков

Flux<String> users = Flux.just("alice", "bob", "charlie");

Flux<String> result = users.flatMap(user ->
    getUserOrders(user) // Возвращает Flux<Order>
        .map(order -> user + ": " + order)
);

// flatMap асинхронно обрабатывает результаты

filter - фильтрация элементов

Flux<Integer> numbers = Flux.range(1, 10);

Flux<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

evenNumbers.subscribe(System.out::println);
// 2, 4, 6, 8, 10

reduce - агрегация

Flux<Integer> numbers = Flux.range(1, 5);

Mono<Integer> sum = numbers.reduce(0, (a, b) -> a + b);

sum.subscribe(System.out::println);
// 15

Обработка ошибок

Flux<Integer> numbers = Flux.just(1, 2, 0, 4);

Flux<String> result = numbers
    .map(n -> "Result: " + (10 / n))  // Может выбросить исключение
    .onErrorReturn("Error occurred")   // Обработка ошибки
    .doOnError(error -> logger.error("Error: ", error));

result.subscribe(System.out::println);
// Result: 10
// Result: 5
// Error occurred

Практический пример: Fetch данных с сервера

import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClient;

@Service
public class UserService {
    private WebClient webClient;
    
    // Получить пользователя асинхронно
    public Mono<User> getUserById(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class)
            .timeout(Duration.ofSeconds(5))
            .onErrorResume(error -> {
                logger.error("Error fetching user", error);
                return Mono.empty();
            });
    }
    
    // Получить список пользователей
    public Flux<User> getAllUsers() {
        return webClient.get()
            .uri("/users")
            .retrieve()
            .bodyToFlux(User.class)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }
}

Spring WebFlux - реактивный веб фреймворк

import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/api/users")
public class UserController {
    private UserService userService;
    
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return userService.getUserById(id);
    }
    
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.getAllUsers();
    }
    
    @PostMapping
    public Mono<User> createUser(@RequestBody User user) {
        return userService.saveUser(user);
    }
}

Продвинутые операции

zip - объединение потоков

Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);

Flux<String> combined = Flux.zip(names, ages)
    .map(tuple -> tuple.getT1() + ": " + tuple.getT2());

combined.subscribe(System.out::println);
// Alice: 25
// Bob: 30
// Charlie: 35

merge - объединение данных

Flux<String> flux1 = Flux.just("a", "b");
Flux<String> flux2 = Flux.just("c", "d");

Flux<String> merged = Flux.merge(flux1, flux2);

merged.subscribe(System.out::println);
// a, b, c, d (порядок может быть другим)

delay - задержка

Flux<String> delayed = Flux.just("Hello")
    .delayElements(Duration.ofSeconds(1));

delayed.subscribe(System.out::println);
// (пауза 1 сек) Hello

Backpressure - контроль скорости

Flux<Integer> numbers = Flux.range(1, 100);

numbers
    .subscribe(
        item -> System.out.println(item),
        error -> System.err.println(error),
        () -> System.out.println("Done"),
        subscription -> subscription.request(10)  // Запрос 10 элементов
    );

Scheduler - выполнение на разных потоках

import reactor.core.scheduler.Schedulers;

Flux<Integer> numbers = Flux.range(1, 5);

numbers
    .publishOn(Schedulers.parallel())  // Выполнить на параллельном потоке
    .map(n -> {
        System.out.println(Thread.currentThread().getName() + ": " + n);
        return n * 2;
    })
    .subscribeOn(Schedulers.boundedElastic())  // Подписаться на elastic потоке
    .subscribe(System.out::println);

Сравнение: traditionalвый vs реактивный код

// ТРАДИЦИОННЫЙ (блокирующий)
@GetMapping("/users/{id}")
public ResponseEntity<User> getUser(@PathVariable Long id) {
    User user = userRepository.findById(id).orElseThrow();
    user.setOrders(orderService.getOrders(id));  // Ждём результата
    return ResponseEntity.ok(user);
}

// РЕАКТИВНЫЙ (асинхронный)
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
    return userRepository.findById(id)
        .switchIfEmpty(Mono.error(new NotFoundException()))
        .flatMap(user -> orderService.getOrders(id)
            .collectList()
            .map(orders -> {
                user.setOrders(orders);
                return user;
            })
        );
}

Тестирование с Project Reactor

import reactor.test.StepVerifier;

@Test
public void testFlux() {
    Flux<String> flux = Flux.just("a", "b", "c");
    
    StepVerifier.create(flux)
        .expectNext("a")
        .expectNext("b")
        .expectNext("c")
        .expectComplete()
        .verify();
}

@Test
public void testMono() {
    Mono<String> mono = Mono.just("Hello");
    
    StepVerifier.create(mono)
        .expectNext("Hello")
        .expectComplete()
        .verify();
}

Когда использовать Project Reactor?

Использовать:

  • Высоконагруженные приложения
  • Асинхронная обработка данных
  • Работа с внешними API
  • Потокобезопасность важна
  • Spring WebFlux приложения

НЕ использовать:

  • Простые приложения с невысокой нагрузкой
  • Когда нужна простота кода
  • Если команда незнакома с реактивностью

Итого

  • Project Reactor — реактивная библиотека для Java
  • Flux — поток 0+ элементов
  • Mono — поток 0-1 элементов
  • Асинхронный, неблокирующий код — лучше использует ресурсы
  • Spring WebFlux — реактивный веб фреймворк на основе Reactor
  • Нужна практика — реактивный код требует переосмысления
Что такое Project Reactor в Java? | PrepBro