← Назад к вопросам
Что такое flux?
2.7 Senior🔥 211 комментариев
#Stream API и функциональное программирование
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Flux в Project Reactor
Flux — это реактивный тип данных в Project Reactor, который представляет асинхронный поток элементов (0, 1 или много элементов), аналогично Observable в RxJava, но оптимизирован для использования в Spring WebFlux и modern async приложениях.
FLUX — это холодный publisher, который начинает генерировать элементы только при подписке.
Основные различия: Mono vs Flux
| Тип | Элементы | Использование |
|---|---|---|
| Mono | 0 или 1 | Single value, optional |
| Flux | 0, 1 или много | Stream of values |
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// Mono — когда ожидаем один результат
Mono<String> hello = Mono.just("Hello");
// Flux — когда ожидаем поток
Flux<String> words = Flux.just("Hello", "World", "Reactor");
Создание Flux
// 1. Из массива/коллекции
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
// 2. Из итерируемого
Flux<String> colors = Flux.fromIterable(Arrays.asList("Red", "Green", "Blue"));
// 3. Из массива
Flux<Double> prices = Flux.fromArray(new Double[]{10.5, 20.3, 30.0});
// 4. Генерация последовательности
Flux<Integer> sequence = Flux.range(1, 5); // 1, 2, 3, 4, 5
// 5. Повторение
Flux<String> repeated = Flux.just("A").repeat(2); // A, A, A
// 6. Пустой Flux
Flux<String> empty = Flux.empty();
// 7. С ошибкой
Flux<String> error = Flux.error(new RuntimeException("Error"));
// 8. Отложенное создание (холодный поток)
Flux<Integer> cold = Flux.defer(() -> Flux.just(System.currentTimeMillis()));
// 9. Интервал
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // Каждую секунду
Подписка на Flux
// Базовая подписка
Flux<Integer> flux = Flux.just(1, 2, 3);
flux.subscribe(System.out::println);
// Output: 1, 2, 3
// С обработкой ошибок
flux.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed") // onComplete
);
// С Disposable (для отписки)
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose(); // Отписка
Операции над Flux (операторы)
map — трансформация каждого элемента
Flux.just(1, 2, 3, 4, 5)
.map(x -> x * 2) // 2, 4, 6, 8, 10
.subscribe(System.out::println);
filter — фильтрация элементов
Flux.just(1, 2, 3, 4, 5)
.filter(x -> x > 2) // 3, 4, 5
.subscribe(System.out::println);
flatMap — асинхронная трансформация
Flux.just(1, 2, 3)
.flatMap(id -> getUser(id)) // Асинхронно получаем пользователей
.subscribe(user -> System.out.println(user));
// getUser возвращает Mono<User>
private Mono<User> getUser(int id) {
return userService.findById(id); // Асинхронный запрос в БД
}
concatMap — последовательная асинхронная трансформация
Flux.just(1, 2, 3)
.concatMap(id -> getUser(id)) // Обрабатывает по порядку
.subscribe(System.out::println);
// Гарантирует порядок элементов
merge — объединение нескольких потоков
Flux<Integer> flux1 = Flux.just(1, 2);
Flux<Integer> flux2 = Flux.just(3, 4);
Flux.merge(flux1, flux2)
.subscribe(System.out::println);
// Output: 1, 2, 3, 4 (порядок может быть другим!)
concat — объединение с сохранением порядка
Flux.concat(flux1, flux2)
.subscribe(System.out::println);
// Output: 1, 2, 3, 4 (гарантированный порядок)
zip — объединение элементов из двух потоков
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<String> names = Flux.just("One", "Two", "Three");
Flux.zip(numbers, names)
.map(tuple -> tuple.getT1() + " - " + tuple.getT2())
.subscribe(System.out::println);
// Output: 1 - One, 2 - Two, 3 - Three
take — ограничение количества элементов
Flux.range(1, 100)
.take(5) // Первые 5 элементов
.subscribe(System.out::println);
skip — пропуск элементов
Flux.range(1, 10)
.skip(3) // Пропускаем первые 3
.subscribe(System.out::println);
// Output: 4, 5, 6, 7, 8, 9, 10
doOnNext — побочные эффекты без трансформации
Flux.just(1, 2, 3)
.doOnNext(x -> System.out.println("Processing: " + x))
.map(x -> x * 2)
.subscribe(System.out::println);
Обработка ошибок в Flux
// onErrorReturn — вернуть default значение
Flux.just(1, 2, 3)
.map(x -> 10 / (x - 2)) // На x=2 будет ошибка
.onErrorReturn(-1) // При ошибке вернуть -1
.subscribe(System.out::println);
// onErrorResume — переключиться на другой Flux
Flux.just(1, 2, 3)
.flatMap(id -> getUser(id))
.onErrorResume(error -> Flux.just(new User(0, "Unknown")))
.subscribe(System.out::println);
// retry — переподписка при ошибке
Flux.just(1, 2, 3)
.flatMap(id -> unreliableService(id))
.retry(2) // Повторить 2 раза при ошибке
.subscribe(System.out::println);
// timeout — ограничение времени
Flux.just(1, 2, 3)
.delayElement(Duration.ofSeconds(10))
.timeout(Duration.ofSeconds(5)) // Ошибка если > 5 сек
.subscribe(System.out::println);
Flux в Spring WebFlux
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/api/users")
public class UserController {
private UserService userService;
// Endpoint возвращающий поток пользователей
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
// Автоматически конвертируется в Server-Sent Events (SSE)
}
// Endpoint возвращающий одного пользователя
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable Long id) {
return userService.findById(id);
}
// Endpoint обработки потока запросов
@PostMapping("/batch")
public Flux<User> createBatch(@RequestBody Flux<CreateUserRequest> requests) {
return requests
.flatMap(req -> userService.create(req))
.doOnError(e -> System.err.println("Creation failed: " + e));
}
}
Практический пример: Обработка больших файлов
public class FileProcessor {
// Читаем файл потоком, обрабатываем асинхронно
public Flux<ProcessedLine> processLargeFile(String filePath) {
return Flux.fromIterable(readLinesChunked(filePath))
.flatMap(this::validateLine) // Асинхронная валидация
.filter(line -> !line.isEmpty())
.map(this::parse) // Парсим
.onErrorResume(e -> {
log.error("Processing failed", e);
return Flux.empty();
})
.doOnComplete(() -> log.info("File processing completed"));
}
private Mono<String> validateLine(String line) {
return Mono.just(line)
.filter(l -> l.length() > 0)
.switchIfEmpty(Mono.error(new IllegalArgumentException("Empty line")));
}
}
Горячие и холодные потоки
// Холодный Flux (cold) — создаётся заново при каждой подписке
Flux<Integer> cold = Flux.just(1, 2, 3);
cold.subscribe(System.out::println);
cold.subscribe(System.out::println); // Два раза выведет 1, 2, 3
// Горячий Flux (hot) — делится между подписчиками
Flux<Integer> hot = Flux.just(1, 2, 3)
.share(); // Или .publish().refCount()
Заключение
Flux — мощный инструмент для обработки асинхронных потоков данных в Java. Используется в Spring WebFlux, R2DBC, и других реактивных фреймворках для построения неблокирующих, высоконагруженных приложений.