← Назад к вопросам

В чем разница между subscribeOn и observeOn в RxJava?

1.7 Middle🔥 181 комментариев
#Многопоточность и асинхронность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Разница между subscribeOn и observeOn в RxJava

Это фундаментальная разница в управлении потоками в RxJava. Часто путают эти операторы, но они работают по-разному.

subscribeOn — где выполняется ИСТОЧНИК (upstream)

subscribeOn определяет, на каком потоке будут выполняться операторы ВЫШЕ (upstream), вплоть до источника данных:

Observable.just(1, 2, 3)  // Это выполнится на потоке, указанном в subscribeOn
    .subscribeOn(Schedulers.io())
    .map { it * 2 }        // Это выполнится на IO потоке
    .filter { it > 2 }     // Это выполнится на IO потоке
    .subscribe { println(it) }  // Это выполнится на IO потоке

observeOn — где выполняется ПОДПИСЧИК (downstream)

observeOn определяет, на каком потоке будут выполняться операторы НИЖЕ (downstream), вплоть до подписчика:

Observable.just(1, 2, 3)
    .map { it * 2 }        // Это выполнится на io потоке (из subscribeOn выше)
    .observeOn(Schedulers.mainThread())  // Переключение потока!
    .filter { it > 2 }     // Это выполнится на Main потоке
    .subscribe { println(it) }  // Это выполнится на Main потоке

Визуальное представление

observable         subscribeOn       observeOn
    |
    |--- source ------ IO thread --------->  calculation thread  -----> Main thread
         (IO)         (executed here)       (executed here)        (executed here)

Практический пример

api.fetchUsers()                    // Сетевой запрос
    .subscribeOn(Schedulers.io())   // Выполняется на IO потоке
    .map { users ->
        // Обработка данных на IO потоке
        users.filter { it.isActive }
    }
    .observeOn(AndroidSchedulers.mainThread())  // Переключаемся на Main
    .subscribe { users ->
        // Обновляем UI на Main потоке
        adapter.setData(users)
    }

Таблица различий

ПараметрsubscribeOnobserveOn
Что контролируетГде выполняется SOURCEГде выполняется SUBSCRIBER
НаправлениеUpstream (вверх)Downstream (вниз)
Влияние на операторы вышеДаНет
Влияние на операторы нижеНетДа
Сколько раз может бытьОдна (последняя вычисляется)Много раз (все работают)

subscribeOn срабатывает только один раз

Если указать subscribeOn несколько раз, сработает ТОЛЬКО ПЕРВЫЙ (ближайший к источнику):

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.io())        // Этот сработает
    .subscribeOn(Schedulers.newThread()) // Этот проигнорируется
    .subscribe { println(it) }

observeOn срабатывает каждый раз

Когда несколько observeOn — переключаемся каждый раз:

api.fetchData()                           // IO поток
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation()) // Переключение на Computation
    .map { calculate(it) }              // На Computation потоке
    .observeOn(AndroidSchedulers.mainThread()) // Переключение на Main
    .subscribe { updateUI(it) }         // На Main потоке

Реальный сценарий: загрузка данных

class UserRepository {
    fun getUsers(): Observable<List<User>> {
        return api.fetchUsers()          // Сетевой запрос
            .subscribeOn(Schedulers.io()) // На IO потоке (фоновый)
            .map { parseJson(it) }       // Парсинг JSON на IO
            .observeOn(AndroidSchedulers.mainThread()) // На Main
            .doOnNext { cache.save(it) } // Сохранение в БД на Main
    }
}

class UserViewModel {
    fun load() {
        repository.getUsers()
            .observeOn(AndroidSchedulers.mainThread()) // Уже Main (от repo)
            .subscribe(
                { users -> 
                    state.value = users
                },
                { error ->
                    state.value = Error(error)
                }
            )
    }
}

Типичные потоки и Scheduler'ы

// Для блокирующих операций (БД, файлы, сеть)
Schedulers.io()        // Thread pool, оптимален для I/O

// Для CPU-интенсивных операций (калькуляции, фильтрация)
Schedulers.computation() // Thread pool по числу ядер CPU

// Для UI обновлений (только Android)
AndroidSchedulers.mainThread() // Main поток

// Для немедленного выполнения
Schedulers.immediate()  // Текущий поток

// Для одноразовых задач
Schedulers.newThread()  // Новый поток для каждого subscription

Важные моменты

1. observeOn НЕ влияет на источник

Observable.just(1, 2, 3)  // Это ВСЕГДА выполнится синхронно
    .observeOn(Schedulers.io())
    .subscribe { println(it) } // Это выполнится на IO

2. subscribeOn влияет на ВСЁ выше него

val observable = Observable.range(1, 3)  // Этот код выполнится сейчас!
    .subscribeOn(Schedulers.io())        // Но ПОДПИСАНИЕ произойдёт на IO
    .map { it * 2 }

observable.subscribe { println(it) }

3. Если нет observeOn — результат на потоке subscribeOn

api.fetch()
    .subscribeOn(Schedulers.io())
    // НЕ указан observeOn!
    .subscribe { updateUI(it) } // ОШИБКА! updateUI на IO потоке!

Антипаттерн и как это исправить

// НЕПРАВИЛЬНО: UI обновление на фоновом потоке
api.fetch()
    .subscribeOn(Schedulers.io())
    .subscribe { users ->
        binding.listView.adapter = adapter // Crash!
    }

// ПРАВИЛЬНО: явный observeOn для UI
api.fetch()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { users ->
        binding.listView.adapter = adapter // OK
    }

Итог

  • subscribeOn = «где ВЫПОЛНЯЕТСЯ источник» (один раз, на IO для сети)
  • observeOn = «где ВЫПОЛНЯЕТСЯ наблюдатель» (может быть много раз)
  • Правило: если меняешь поток (с IO на Main), используй observeOn
  • UI обновления: всегда заканчивай с observeOn(AndroidSchedulers.mainThread())
  • Нет observeOn = результат на потоке из subscribeOn (опасно для UI!)
В чем разница между subscribeOn и observeOn в RxJava? | PrepBro