← Назад к вопросам

Гарантирует ли zip параллельность в RxJava?

2.7 Senior🔥 151 комментариев
#Многопоточность и асинхронность#Производительность и оптимизация

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Гарантирует ли zip параллельность в RxJava?

Нет, оператор zip в RxJava не гарантирует параллельность выполнения источников. Это распространённое заблуждение. Zip просто синхронизирует значения из нескольких потоков и объединяет их попарно — параллельность зависит от того, какие Scheduler'ы используются источниками.

Как работает zip

Определение:

Zip объединяет несколько источников, дожидаясь значения от каждого, и эмитит их вместе:

val source1 = Observable.just(1, 2, 3)
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io())

val source2 = Observable.just("A", "B", "C")
    .delay(200, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io())

Observable.zip(source1, source2) { num, str ->
    "$num$str"
}
    .subscribe { result ->
        println(result) // 1A, 2B, 3C
    }

Параллельность зависит от Scheduler'ов

Параллельное выполнение:

Если использовать разные потоки для каждого источника, они будут выполняться параллельно:

val source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io()) // поток 1

val source2 = Observable.interval(150, TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.computation()) // поток 2

Observable.zip(source1, source2) { a, b -> a to b }
    .subscribe { println("${Thread.currentThread().name}: $it") }

В этом случае оба источника работают одновременно в разных потоках.

Последовательное выполнение:

Если оба источника используют одинаковый Scheduler, они будут выполняться последовательно:

val scheduler = Schedulers.single() // одноточечный scheduler

val source1 = Observable.just(1, 2, 3)
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribeOn(scheduler)

val source2 = Observable.just("A", "B", "C")
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribeOn(scheduler)

Observable.zip(source1, source2) { num, str -> "$num$str" }
    .subscribe { println(it) }
// Выполнение будет в одном потоке, даже с zip

Буферизация в zip

Важный момент: zip буферизует значения, ожидая совпадения по индексам:

val fast = Observable.range(1, 1000)      // эмитит быстро
val slow = Observable.interval(1, TimeUnit.SECONDS) // эмитит медленно

Observable.zip(fast, slow) { a, b -> "$a-$b" }
    .subscribe { println(it) }

// fast эмитит все 1000 значений,
// они буферизуются в памяти,
// а slow эмитит по одному значению в секунду

Это может привести к утечке памяти, если разница в скорости значительна.

Правильное использование для параллельности

Решение 1: Явное указание потоков

val source1 = fetchDataFromAPI1()
    .subscribeOn(Schedulers.io())

val source2 = fetchDataFromAPI2()
    .subscribeOn(Schedulers.io())

Observable.zip(source1, source2) { data1, data2 ->
    combineData(data1, data2)
}
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { result ->
        updateUI(result)
    }

Оба запроса выполняются в разных потоках пула IO, таким образом, они выполняются параллельно.

Решение 2: combineLatest для быстрого завершения

Если вы хотите получить последнее значение каждого источника, используйте combineLatest:

Observable.combineLatest(
    source1.subscribeOn(Schedulers.io()),
    source2.subscribeOn(Schedulers.io())
) { a, b -> a to b }
    .subscribe { println(it) }

Сравнение операторов

Zip синхронизирует по индексу с потенциальной буферизацией. CombineLatest синхронизирует по времени без буферизации. Merge объединяет без синхронизации. Все зависят от scheduler'ов для параллельности.

Заключение

Zip — это оператор синхронизации, а не параллелизации. Параллельность обеспечивают Scheduler'ы. Используй разные потоки в каждом источнике, и тогда zip работает с параллельно получаемыми данными. Помни о буферизации и возможной утечке памяти при разной скорости источников.