← Назад к вопросам
В чем разница между subscribeOn и observeOn в RxJava?
1.7 Middle🔥 181 комментариев
#Многопоточность и асинхронность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Разница между subscribeOn и observeOn в RxJava
Это фундаментальная разница в управлении потоками в RxJava. Часто путают эти операторы, но они работают по-разному.
subscribeOn — где выполняется ИСТОЧНИК (upstream)
subscribeOn определяет, на каком потоке будут выполняться операторы ВЫШЕ (upstream), вплоть до источника данных:
Observable.just(1, 2, 3) // Это выполнится на потоке, указанном в subscribeOn
.subscribeOn(Schedulers.io())
.map { it * 2 } // Это выполнится на IO потоке
.filter { it > 2 } // Это выполнится на IO потоке
.subscribe { println(it) } // Это выполнится на IO потоке
observeOn — где выполняется ПОДПИСЧИК (downstream)
observeOn определяет, на каком потоке будут выполняться операторы НИЖЕ (downstream), вплоть до подписчика:
Observable.just(1, 2, 3)
.map { it * 2 } // Это выполнится на io потоке (из subscribeOn выше)
.observeOn(Schedulers.mainThread()) // Переключение потока!
.filter { it > 2 } // Это выполнится на Main потоке
.subscribe { println(it) } // Это выполнится на Main потоке
Визуальное представление
observable subscribeOn observeOn
|
|--- source ------ IO thread ---------> calculation thread -----> Main thread
(IO) (executed here) (executed here) (executed here)
Практический пример
api.fetchUsers() // Сетевой запрос
.subscribeOn(Schedulers.io()) // Выполняется на IO потоке
.map { users ->
// Обработка данных на IO потоке
users.filter { it.isActive }
}
.observeOn(AndroidSchedulers.mainThread()) // Переключаемся на Main
.subscribe { users ->
// Обновляем UI на Main потоке
adapter.setData(users)
}
Таблица различий
| Параметр | subscribeOn | observeOn |
|---|---|---|
| Что контролирует | Где выполняется SOURCE | Где выполняется SUBSCRIBER |
| Направление | Upstream (вверх) | Downstream (вниз) |
| Влияние на операторы выше | Да | Нет |
| Влияние на операторы ниже | Нет | Да |
| Сколько раз может быть | Одна (последняя вычисляется) | Много раз (все работают) |
subscribeOn срабатывает только один раз
Если указать subscribeOn несколько раз, сработает ТОЛЬКО ПЕРВЫЙ (ближайший к источнику):
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io()) // Этот сработает
.subscribeOn(Schedulers.newThread()) // Этот проигнорируется
.subscribe { println(it) }
observeOn срабатывает каждый раз
Когда несколько observeOn — переключаемся каждый раз:
api.fetchData() // IO поток
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation()) // Переключение на Computation
.map { calculate(it) } // На Computation потоке
.observeOn(AndroidSchedulers.mainThread()) // Переключение на Main
.subscribe { updateUI(it) } // На Main потоке
Реальный сценарий: загрузка данных
class UserRepository {
fun getUsers(): Observable<List<User>> {
return api.fetchUsers() // Сетевой запрос
.subscribeOn(Schedulers.io()) // На IO потоке (фоновый)
.map { parseJson(it) } // Парсинг JSON на IO
.observeOn(AndroidSchedulers.mainThread()) // На Main
.doOnNext { cache.save(it) } // Сохранение в БД на Main
}
}
class UserViewModel {
fun load() {
repository.getUsers()
.observeOn(AndroidSchedulers.mainThread()) // Уже Main (от repo)
.subscribe(
{ users ->
state.value = users
},
{ error ->
state.value = Error(error)
}
)
}
}
Типичные потоки и Scheduler'ы
// Для блокирующих операций (БД, файлы, сеть)
Schedulers.io() // Thread pool, оптимален для I/O
// Для CPU-интенсивных операций (калькуляции, фильтрация)
Schedulers.computation() // Thread pool по числу ядер CPU
// Для UI обновлений (только Android)
AndroidSchedulers.mainThread() // Main поток
// Для немедленного выполнения
Schedulers.immediate() // Текущий поток
// Для одноразовых задач
Schedulers.newThread() // Новый поток для каждого subscription
Важные моменты
1. observeOn НЕ влияет на источник
Observable.just(1, 2, 3) // Это ВСЕГДА выполнится синхронно
.observeOn(Schedulers.io())
.subscribe { println(it) } // Это выполнится на IO
2. subscribeOn влияет на ВСЁ выше него
val observable = Observable.range(1, 3) // Этот код выполнится сейчас!
.subscribeOn(Schedulers.io()) // Но ПОДПИСАНИЕ произойдёт на IO
.map { it * 2 }
observable.subscribe { println(it) }
3. Если нет observeOn — результат на потоке subscribeOn
api.fetch()
.subscribeOn(Schedulers.io())
// НЕ указан observeOn!
.subscribe { updateUI(it) } // ОШИБКА! updateUI на IO потоке!
Антипаттерн и как это исправить
// НЕПРАВИЛЬНО: UI обновление на фоновом потоке
api.fetch()
.subscribeOn(Schedulers.io())
.subscribe { users ->
binding.listView.adapter = adapter // Crash!
}
// ПРАВИЛЬНО: явный observeOn для UI
api.fetch()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { users ->
binding.listView.adapter = adapter // OK
}
Итог
- subscribeOn = «где ВЫПОЛНЯЕТСЯ источник» (один раз, на IO для сети)
- observeOn = «где ВЫПОЛНЯЕТСЯ наблюдатель» (может быть много раз)
- Правило: если меняешь поток (с IO на Main), используй observeOn
- UI обновления: всегда заканчивай с observeOn(AndroidSchedulers.mainThread())
- Нет observeOn = результат на потоке из subscribeOn (опасно для UI!)