← Назад к вопросам

Как дернуть несколько запросов одновременно с помощью RxJava

1.8 Middle🔥 132 комментариев
#Многопоточность и асинхронность#Сетевое взаимодействие

Комментарии (2)

🐱
deepseek-v3.2PrepBro AI5 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Параллельное выполнение запросов в RxJava

В RxJava существует несколько подходов для параллельного выполнения нескольких запросов. Основной принцип заключается в использовании операторов, которые позволяют запускать и комбинировать результаты нескольких Observable потоков.

Основные операторы для параллельных запросов

1. zip() - комбинация результатов

Оператор zip объединяет результаты нескольких Observable и преобразует их в единый результат, когда ВСЕ источники испустят значения.

fun fetchUserData(userId: String): Single<User> {
    return userApi.getUser(userId)
}

fun fetchUserPosts(userId: String): Single<List<Post>> {
    return postApi.getUserPosts(userId)
}

fun fetchUserAndPosts(userId: String): Single<Pair<User, List<Post>>> {
    return Single.zip(
        fetchUserData(userId),
        fetchUserPosts(userId),
        BiFunction { user, posts -> Pair(user, posts) }
    )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
}

2. merge() / concat() - объединение потоков

Эти операторы объединяют несколько Observable в один, но с разной стратегией:

  • merge() - параллельное выполнение, порядок не гарантирован
  • concat() - последовательное выполнение в порядке добавления
fun fetchMultipleUsers(userIds: List<String>): Observable<User> {
    val observables = userIds.map { userId ->
        userApi.getUser(userId)
            .subscribeOn(Schedulers.io())
            .toObservable()
    }
    
    return Observable.merge(observables)
}

3. flatMap() с Schedulers - преобразование с параллелизацией

Комбинация flatMap с указанием шедулера позволяет запускать параллельные запросы:

fun fetchUsersWithDetails(userIds: List<String>): Observable<UserDetails> {
    return Observable.fromIterable(userIds)
        .flatMap { userId ->
            userApi.getUserDetails(userId)
                .subscribeOn(Schedulers.io())
                .toObservable()
        }
        .observeOn(AndroidSchedulers.mainThread())
}

Продвинутые техники

4. Parallel Flowable (RxJava 2+)

Для обработки больших объемов данных можно использовать ParallelFlowable:

fun processMultipleItems(items: List<String>): Flowable<Result> {
    return Flowable.fromIterable(items)
        .parallel()
        .runOn(Schedulers.io())
        .flatMap { item -> processItem(item).toFlowable() }
        .sequential()
}

5. combineLatest() - комбинация последних значений

Полезен, когда нужно реагировать на изменения в нескольких источниках:

fun monitorUserAndLocation(): Observable<DashboardData> {
    return Observable.combineLatest(
        userRepository.getUserUpdates(),
        locationRepository.getLocationUpdates(),
        BiFunction { user, location -> 
            DashboardData(user, location) 
        }
    )
}

Практический пример с обработкой ошибок

fun fetchMultipleDataSources(userId: String): Single<CombinedData> {
    val userRequest = userApi.getUser(userId)
        .subscribeOn(Schedulers.io())
        .onErrorReturn { User.EMPTY }
    
    val postsRequest = postApi.getUserPosts(userId)
        .subscribeOn(Schedulers.io())
        .onErrorReturn { emptyList() }
    
    val settingsRequest = settingsApi.getUserSettings(userId)
        .subscribeOn(Schedulers.io())
        .onErrorReturn { defaultSettings() }
    
    return Single.zip(
        userRequest,
        postsRequest,
        settingsRequest,
        Function3 { user, posts, settings ->
            CombinedData(user, posts, settings)
        }
    )
    .timeout(5, TimeUnit.SECONDS) // Таймаут для всех запросов
    .observeOn(AndroidSchedulers.mainThread())
}

Ключевые рекомендации

  1. Управление потоками:

    • Всегда указывайте subscribeOn() для каждого запроса отдельно
    • Используйте observeOn() для обработки результатов в UI-потоке
  2. Обработка ошибок:

    • Используйте onErrorReturn(), onErrorResumeNext() для graceful error handling
    • Рассмотрите timeout() для предотвращения вечного ожидания
  3. Оптимизация:

    • Ограничивайте количество параллельных запросов с помощью flatMap с параметром concurrency
    • Используйте cache() или replay() для повторного использования результатов
  4. Отмена запросов:

    • Не забывайте управлять жизненным циклом через CompositeDisposable
    • Используйте takeUntil() для автоматической отмены

Выбор конкретного подхода зависит от требований: нужно ли ждать все результаты (zip), обрабатывать их по мере поступления (merge), или преобразовывать последовательно (concat). Для большинства сценариев параллельных API-запросов оптимальным является использование zip() с отдельными subscribeOn() для каждого запроса.

Как дернуть несколько запросов одновременно с помощью RxJava | PrepBro