← Назад к вопросам

Что такое RxJava?

3.0 Senior🔥 181 комментариев
#Stream API и функциональное программирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

RxJava

RxJava — это библиотека для реактивного программирования на Java, которая позволяет легко работать с асинхронными потоками данных и событиями. RxJava реализует паттерн Observer и Reactive Streams, делая код более компактным и управляемым при работе с потоками данных.

Основные концепции RxJava

Observable и Observer

Observable — это источник данных, Observer — подписчик на эти данные:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;

public class RxJavaBasics {
    
    public static void main(String[] args) {
        // Создание Observable источника
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        });
        
        // Подписка на Observable
        observable.subscribe(
            item -> System.out.println("Item: " + item),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed")
        );
    }
}

Hot и Cold Observables

Cold Observable создают значения для каждого подписчика:

Observable<Integer> cold = Observable.create(emitter -> {
    System.out.println("Cold source started");
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
});

cold.subscribe(item -> System.out.println("Sub1: " + item)); // запустится источник
cold.subscribe(item -> System.out.println("Sub2: " + item)); // запустится снова

Hot Observable делят один источник между подписчиками:

import io.reactivex.rxjava3.subjects.PublishSubject;

PublishSubject<Integer> hot = PublishSubject.create();

hot.subscribe(item -> System.out.println("Sub1: " + item));
hot.onNext(1);
hot.onNext(2);

hot.subscribe(item -> System.out.println("Sub2: " + item));
hot.onNext(3); // Sub2 получит только 3

Операторы RxJava

Операторы трансформируют и фильтруют данные:

import io.reactivex.rxjava3.core.Observable;

public class RxJavaOperators {
    
    public static void main(String[] args) {
        // map — трансформация данных
        Observable.just(1, 2, 3, 4, 5)
            .map(x -> x * 2)
            .subscribe(System.out::println); // 2, 4, 6, 8, 10
        
        // filter — фильтрация
        Observable.just(1, 2, 3, 4, 5)
            .filter(x -> x > 2)
            .subscribe(System.out::println); // 3, 4, 5
        
        // flatMap — превращение каждого элемента в Observable
        Observable.just("Hello", "World")
            .flatMap(str -> Observable.fromArray(str.split("")))
            .subscribe(System.out::println);
        
        // reduce — агрегация значений
        Observable.just(1, 2, 3, 4, 5)
            .reduce((a, b) -> a + b)
            .subscribe(System.out::println); // 15
    }
}

Использование RxJava в приложениях

Практический пример с HTTP запросами:

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

public class RxJavaHttpExample {
    
    public static void main(String[] args) throws InterruptedException {
        // Имитация HTTP запроса
        Observable<String> httpRequest = Observable.create(emitter -> {
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                    emitter.onNext("Response from server");
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }).start();
        });
        
        httpRequest
            .retry(2) // повторить при ошибке
            .timeout(5, TimeUnit.SECONDS) // timeout
            .subscribe(
                response -> System.out.println("Success: " + response),
                error -> System.out.println("Error: " + error.getMessage()),
                () -> System.out.println("Done")
            );
        
        Thread.sleep(3000);
    }
}

Schedulers — работа с потоками

Schedulers управляют на каких потоках выполняются операции:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

public class RxJavaSchedulers {
    
    public static void main(String[] args) throws InterruptedException {
        Observable.just(1, 2, 3, 4, 5)
            // subscribeOn определяет поток для источника
            .subscribeOn(Schedulers.io())
            
            // Трансформация в background потоке
            .map(x -> {
                System.out.println("Processing on: " + Thread.currentThread().getName());
                return x * 2;
            })
            
            // observeOn определяет поток для Observer
            .observeOn(Schedulers.newThread())
            
            // Вывод в отдельном потоке
            .subscribe(x -> {
                System.out.println("Receiving on: " + Thread.currentThread().getName());
                System.out.println("Value: " + x);
            });
        
        Thread.sleep(2000);
    }
}

Комбинирование Observables

import io.reactivex.rxjava3.core.Observable;

public class RxJavaCombine {
    
    public static void main(String[] args) {
        Observable<Integer> obs1 = Observable.just(1, 2);
        Observable<Integer> obs2 = Observable.just(3, 4);
        Observable<Integer> obs3 = Observable.just(5, 6);
        
        // merge — объединение без синхронизации
        Observable.merge(obs1, obs2, obs3)
            .subscribe(System.out::println);
        
        // concat — последовательное объединение
        Observable.concat(obs1, obs2, obs3)
            .subscribe(System.out::println);
        
        // zip — синхронное объединение
        Observable.zip(obs1, obs2, (a, b) -> a + b)
            .subscribe(System.out::println); // 4, 6
        
        // combineLatest — объединение последних значений
        Observable.combineLatest(obs1, obs2, (a, b) -> a * b)
            .subscribe(System.out::println);
    }
}

Обработка ошибок

import io.reactivex.rxjava3.core.Observable;

public class RxJavaErrorHandling {
    
    public static void main(String[] args) {
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Exception("Something went wrong"));
        });
        
        observable
            // onErrorReturn — вернуть значение при ошибке
            .onErrorReturn(error -> -1)
            .subscribe(System.out::println);
        
        // onErrorResumeNext — переключиться на другой Observable
        observable
            .onErrorResumeNext(Observable.just(10, 20))
            .subscribe(System.out::println);
        
        // retry — повторить при ошибке
        observable
            .retry(3)
            .subscribe(System.out::println);
    }
}

Disposables и управление ресурсами

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import java.util.concurrent.TimeUnit;

public class RxJavaDisposables {
    
    public static void main(String[] args) throws InterruptedException {
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        
        // Интервальный Observable
        compositeDisposable.add(
            Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(
                    count -> System.out.println("Count: " + count),
                    error -> System.err.println("Error: " + error)
                )
        );
        
        Thread.sleep(5000);
        
        // Отписка прекращает Observable
        compositeDisposable.dispose();
        System.out.println("Disposed");
        
        Thread.sleep(1000);
    }
}

RxJava с потоками данных

Создание потока чисел с фильтрацией и преобразованием:

import io.reactivex.rxjava3.core.Observable;

public class RxJavaStream {
    
    public static void main(String[] args) {
        Observable.range(1, 100)
            .filter(x -> x % 2 == 0) // только чётные
            .map(x -> x * x) // возведение в квадрат
            .filter(x -> x > 50) // только больше 50
            .take(10) // первые 10
            .subscribe(
                value -> System.out.println("Value: " + value),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Stream completed")
            );
    }
}

Преимущества RxJava

  • Асинхронность — легко работать с асинхронными операциями
  • Управление потоками — Schedulers упрощают многопоточность
  • Функциональное программирование — операторы вместо loops
  • Обработка ошибок — встроенные механизмы error handling
  • Переиспользуемость — компонуемые операторы
  • Производительность — оптимизирована для потоков данных

Когда использовать RxJava

  • HTTP запросы и сетевые операции
  • Работа с базой данных
  • UI обновления на основе событий
  • Обработка потоков событий
  • Многопоточные приложения
  • Мониторинг систем в реальном времени

RxJava — это мощный инструмент для работы с асинхронными потоками данных, который значительно упрощает написание реактивного и не-blocking кода в Java приложениях.