← Назад к вопросам

Что такое flux?

2.7 Senior🔥 211 комментариев
#Stream API и функциональное программирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Flux в Project Reactor

Flux — это реактивный тип данных в Project Reactor, который представляет асинхронный поток элементов (0, 1 или много элементов), аналогично Observable в RxJava, но оптимизирован для использования в Spring WebFlux и modern async приложениях.

FLUX — это холодный publisher, который начинает генерировать элементы только при подписке.

Основные различия: Mono vs Flux

ТипЭлементыИспользование
Mono0 или 1Single value, optional
Flux0, 1 или многоStream of values
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// Mono — когда ожидаем один результат
Mono<String> hello = Mono.just("Hello");

// Flux — когда ожидаем поток
Flux<String> words = Flux.just("Hello", "World", "Reactor");

Создание Flux

// 1. Из массива/коллекции
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

// 2. Из итерируемого
Flux<String> colors = Flux.fromIterable(Arrays.asList("Red", "Green", "Blue"));

// 3. Из массива
Flux<Double> prices = Flux.fromArray(new Double[]{10.5, 20.3, 30.0});

// 4. Генерация последовательности
Flux<Integer> sequence = Flux.range(1, 5); // 1, 2, 3, 4, 5

// 5. Повторение
Flux<String> repeated = Flux.just("A").repeat(2); // A, A, A

// 6. Пустой Flux
Flux<String> empty = Flux.empty();

// 7. С ошибкой
Flux<String> error = Flux.error(new RuntimeException("Error"));

// 8. Отложенное создание (холодный поток)
Flux<Integer> cold = Flux.defer(() -> Flux.just(System.currentTimeMillis()));

// 9. Интервал
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // Каждую секунду

Подписка на Flux

// Базовая подписка
Flux<Integer> flux = Flux.just(1, 2, 3);
flux.subscribe(System.out::println);
// Output: 1, 2, 3

// С обработкой ошибок
flux.subscribe(
    item -> System.out.println("Item: " + item),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed") // onComplete
);

// С Disposable (для отписки)
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose(); // Отписка

Операции над Flux (операторы)

map — трансформация каждого элемента

Flux.just(1, 2, 3, 4, 5)
    .map(x -> x * 2) // 2, 4, 6, 8, 10
    .subscribe(System.out::println);

filter — фильтрация элементов

Flux.just(1, 2, 3, 4, 5)
    .filter(x -> x > 2) // 3, 4, 5
    .subscribe(System.out::println);

flatMap — асинхронная трансформация

Flux.just(1, 2, 3)
    .flatMap(id -> getUser(id)) // Асинхронно получаем пользователей
    .subscribe(user -> System.out.println(user));

// getUser возвращает Mono<User>
private Mono<User> getUser(int id) {
    return userService.findById(id); // Асинхронный запрос в БД
}

concatMap — последовательная асинхронная трансформация

Flux.just(1, 2, 3)
    .concatMap(id -> getUser(id)) // Обрабатывает по порядку
    .subscribe(System.out::println);
// Гарантирует порядок элементов

merge — объединение нескольких потоков

Flux<Integer> flux1 = Flux.just(1, 2);
Flux<Integer> flux2 = Flux.just(3, 4);

Flux.merge(flux1, flux2)
    .subscribe(System.out::println);
// Output: 1, 2, 3, 4 (порядок может быть другим!)

concat — объединение с сохранением порядка

Flux.concat(flux1, flux2)
    .subscribe(System.out::println);
// Output: 1, 2, 3, 4 (гарантированный порядок)

zip — объединение элементов из двух потоков

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<String> names = Flux.just("One", "Two", "Three");

Flux.zip(numbers, names)
    .map(tuple -> tuple.getT1() + " - " + tuple.getT2())
    .subscribe(System.out::println);
// Output: 1 - One, 2 - Two, 3 - Three

take — ограничение количества элементов

Flux.range(1, 100)
    .take(5) // Первые 5 элементов
    .subscribe(System.out::println);

skip — пропуск элементов

Flux.range(1, 10)
    .skip(3) // Пропускаем первые 3
    .subscribe(System.out::println);
// Output: 4, 5, 6, 7, 8, 9, 10

doOnNext — побочные эффекты без трансформации

Flux.just(1, 2, 3)
    .doOnNext(x -> System.out.println("Processing: " + x))
    .map(x -> x * 2)
    .subscribe(System.out::println);

Обработка ошибок в Flux

// onErrorReturn — вернуть default значение
Flux.just(1, 2, 3)
    .map(x -> 10 / (x - 2)) // На x=2 будет ошибка
    .onErrorReturn(-1) // При ошибке вернуть -1
    .subscribe(System.out::println);

// onErrorResume — переключиться на другой Flux
Flux.just(1, 2, 3)
    .flatMap(id -> getUser(id))
    .onErrorResume(error -> Flux.just(new User(0, "Unknown")))
    .subscribe(System.out::println);

// retry — переподписка при ошибке
Flux.just(1, 2, 3)
    .flatMap(id -> unreliableService(id))
    .retry(2) // Повторить 2 раза при ошибке
    .subscribe(System.out::println);

// timeout — ограничение времени
Flux.just(1, 2, 3)
    .delayElement(Duration.ofSeconds(10))
    .timeout(Duration.ofSeconds(5)) // Ошибка если > 5 сек
    .subscribe(System.out::println);

Flux в Spring WebFlux

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private UserService userService;
    
    // Endpoint возвращающий поток пользователей
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll();
        // Автоматически конвертируется в Server-Sent Events (SSE)
    }
    
    // Endpoint возвращающий одного пользователя
    @GetMapping("/{id}")
    public Mono<User> getUserById(@PathVariable Long id) {
        return userService.findById(id);
    }
    
    // Endpoint обработки потока запросов
    @PostMapping("/batch")
    public Flux<User> createBatch(@RequestBody Flux<CreateUserRequest> requests) {
        return requests
            .flatMap(req -> userService.create(req))
            .doOnError(e -> System.err.println("Creation failed: " + e));
    }
}

Практический пример: Обработка больших файлов

public class FileProcessor {
    
    // Читаем файл потоком, обрабатываем асинхронно
    public Flux<ProcessedLine> processLargeFile(String filePath) {
        return Flux.fromIterable(readLinesChunked(filePath))
            .flatMap(this::validateLine) // Асинхронная валидация
            .filter(line -> !line.isEmpty())
            .map(this::parse) // Парсим
            .onErrorResume(e -> {
                log.error("Processing failed", e);
                return Flux.empty();
            })
            .doOnComplete(() -> log.info("File processing completed"));
    }
    
    private Mono<String> validateLine(String line) {
        return Mono.just(line)
            .filter(l -> l.length() > 0)
            .switchIfEmpty(Mono.error(new IllegalArgumentException("Empty line")));
    }
}

Горячие и холодные потоки

// Холодный Flux (cold) — создаётся заново при каждой подписке
Flux<Integer> cold = Flux.just(1, 2, 3);
cold.subscribe(System.out::println);
cold.subscribe(System.out::println); // Два раза выведет 1, 2, 3

// Горячий Flux (hot) — делится между подписчиками
Flux<Integer> hot = Flux.just(1, 2, 3)
    .share(); // Или .publish().refCount()

Заключение

Flux — мощный инструмент для обработки асинхронных потоков данных в Java. Используется в Spring WebFlux, R2DBC, и других реактивных фреймворках для построения неблокирующих, высоконагруженных приложений.