← Назад к вопросам

Как с помощью RxJava одновременно запустить 2 запроса в сеть и объединить их результат

2.2 Middle🔥 201 комментариев
#Многопоточность и асинхронность#Сетевое взаимодействие

Комментарии (1)

🐱
deepseek-v3.2PrepBro AI5 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Параллельное выполнение и объединение сетевых запросов в RxJava

Для параллельного запуска и объединения двух сетевых запросов в RxJava существует несколько эффективных подходов. Ключевым аспектом является использование операторов для конкурентного выполнения и последующей агрегации результатов.

Основные операторы для параллельного выполнения

1. Observable.zip() — наиболее частый выбор

Оператор zip объединяет несколько Observable, дожидаясь завершения каждого из них, и применяет функцию-комбинатор к полученным результатам.

// Пример с Retrofit (возвращающим Single/Observable)
val apiService: ApiService = retrofit.create(ApiService::class.java)

Observable.zip(
    apiService.getUserData(userId).subscribeOn(Schedulers.io()),
    apiService.getUserPosts(userId).subscribeOn(Schedulers.io()),
    BiFunction { userData: UserData, userPosts: List<Post> ->
        // Комбинатор: создаем объединенный объект
        CombinedResult(userData, userPosts)
    }
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    { combinedResult -> 
        // Обработка объединенного результата
        updateUI(combinedResult) 
    },
    { error -> 
        // Обработка ошибки (если любой из запросов завершится ошибкой)
        showError(error) 
    }
)

Важные особенности zip:

  • Ожидает завершения всех исходных Observable
  • Если один из запросов завершится ошибкой — вся цепочка прерывается
  • Результаты передаются в комбинатор в том же порядке, что и исходные Observable

2. Observable.combineLatest() — для динамических данных

Используется, когда данные могут меняться, и нужно реагировать на обновления любого из источников:

Observable.combineLatest(
    networkService.getCurrentWeather(),
    networkService.getCurrencyRates(),
    BiFunction { weather, currency ->
        DashboardData(weather, currency)
    }
)

3. Single.zip() — для однократных запросов

Более специализированная версия для Single источников:

Single.zip(
    apiService.getUserProfile(userId),
    apiService.getUserSettings(userId),
    BiFunction { profile, settings ->
        UserFullData(profile, settings)
    }
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

Управление потоками выполнения

Для истинного параллелизма важно настроить Schedulers:

// Каждый запрос в отдельном потоке
val request1 = apiService.getData1()
    .subscribeOn(Schedulers.io())
    .onErrorReturn { ErrorData() } // Обработка ошибок для каждого потока

val request2 = apiService.getData2()
    .subscribeOn(Schedulers.io())

Observable.zip(request1, request2) { data1, data2 ->
    // Объединение
}

Обработка ошибок в параллельных запросах

При параллельном выполнении важно правильно обрабатывать ошибки:

// Использование onErrorReturn для каждого запроса
val safeRequest1 = apiService.request1()
    .onErrorReturn { ErrorData() }
    .subscribeOn(Schedulers.io())

val safeRequest2 = apiService.request2()
    .onErrorReturnItem(DefaultData())
    .subscribeOn(Schedulers.io())

// Альтернатива: глобальная обработка
Observable.zip(request1, request2, ::combineResults)
    .onErrorResumeNext { error: Throwable ->
        // Общая обработка ошибки
        Observable.just(FallbackData())
    }

Оптимизация с помощью flatMap и ParallelFlowable

Для большего количества запросов можно использовать ParallelFlowable:

Flowable.fromArray(request1, request2)
    .parallel()
    .runOn(Schedulers.io())
    .flatMap { it }
    .sequential()
    .toList()
    .subscribe { results ->
        // results содержит оба результата
    }

Практические рекомендации

  1. Определите семантику ошибок: Решите, должен ли один неудачный запрос прерывать все операции
  2. Управляйте потоками: Используйте subscribeOn для каждого запроса отдельно
  3. Отмена запросов: Используйте CompositeDisposable для управления жизненным циклом
  4. Таймауты: Добавляйте timeout() для контроля длительных операций
  5. Кеширование: Рассмотрите cache() или replay() для повторного использования результатов
// Полный пример с обработкой жизненного цикла
val disposables = CompositeDisposable()

disposables.add(
    Observable.zip(
        apiService.getData1().timeout(5, TimeUnit.SECONDS),
        apiService.getData2().timeout(5, TimeUnit.SECONDS),
        ::combineResults
    )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { result -> handleSuccess(result) },
        { error -> handleError(error) }
    )
)

// В onDestroy/onCleared
disposables.clear()

Выбор конкретного подхода зависит от требований приложения: нужно ли продолжать выполнение при ошибках, требуется ли реагирование на обновления данных, важна ли очередность результатов. В большинстве случаев для двух одновременных запросов оптимальным выбором будет Observable.zip() или Single.zip() с правильно настроенными Schedulers.