← Назад к вопросам

Что такое Observable?

2.3 Middle🔥 71 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое Observable

Observable — это концепция из reactive programming, которая представляет поток данных, который может быть подписан множеством наблюдателей. Observable испускает значения со временем, и подписчики реагируют на эти события.

Основная идея

Observable — это обобщение итератора на асинхронные случаи:

# Итератор — синхронный поток
for value in range(5):
    print(value)  # 0, 1, 2, 3, 4

# Observable — асинхронный поток
# (значения приходят в произвольные моменты времени)
from rxpy import create

def on_subscribe(observer, scheduler=None):
    observer.on_next(0)
    observer.on_next(1)
    observer.on_next(2)
    observer.on_complete()

observable = create(on_subscribe)
observable.subscribe(on_next=print)  # 0, 1, 2

Реактивное программирование в Python

RxPY — самая популярная библиотека для Reactive Programming в Python:

from rxpy import create, interval
import rxpy.operators as ops

# Создание Observable
def subscribe(observer, scheduler=None):
    observer.on_next("Hello")
    observer.on_next("World")
    observer.on_complete()

observable = create(subscribe)

# Подписка на Observable
observable.subscribe(
    on_next=lambda x: print(f"Значение: {x}"),
    on_error=lambda e: print(f"Ошибка: {e}"),
    on_completed=lambda: print("Завершено")
)

Отличие от Promises/Futures

Обычный Future (одно значение):

from concurrent.futures import ThreadPoolExecutor

def fetch_data():
    return "результат"

with ThreadPoolExecutor() as executor:
    future = executor.submit(fetch_data)
    result = future.result()  # Ждём одно значение

Observable (поток значений):

from rxpy import interval

# Испускает значение каждую секунду, бесконечно
observable = interval(1.0)
observable.subscribe(lambda x: print(f"Значение: {x}"))
# 0, 1, 2, 3, 4, ... (продолжается неопределённо долго)

Операции над Observable

map — трансформирует значения:

from rxpy import of
import rxpy.operators as ops

of(1, 2, 3).pipe(
    ops.map(lambda x: x * 2)
).subscribe(print)  # 2, 4, 6

filter — отфильтровывает значения:

of(1, 2, 3, 4, 5).pipe(
    ops.filter(lambda x: x > 2)
).subscribe(print)  # 3, 4, 5

merge — объединяет несколько Observable:

from rxpy import of, merge

observable1 = of(1, 2, 3)
observable2 = of(4, 5, 6)

merge(observable1, observable2).subscribe(print)
# 1, 2, 3, 4, 5, 6

flatMap — для асинхронных операций:

from rxpy import interval
import rxpy.operators as ops

# Каждое значение от interval запускает HTTP запрос
interval(2.0).pipe(
    ops.flat_map(lambda i: fetch_user_async(i))
).subscribe(print)

Практический пример: обработка кликов

from rxpy import create
import rxpy.operators as ops

# Создаём Observable из кликов
def subscribe(observer, scheduler=None):
    def on_click():
        observer.on_next({"x": 100, "y": 200})
    
    # Предположим, мы регистрируем обработчик события
    element.addEventListener("click", on_click)

click_observable = create(subscribe)

# Фильтруем двойные клики
click_observable.pipe(
    ops.buffer_count(2),
    ops.filter(lambda clicks: clicks[1]["time"] - clicks[0]["time"] < 0.3),
    ops.map(lambda clicks: clicks[1])
).subscribe(
    on_next=lambda click: print(f"Двойной клик: {click}")
)

Observable vs Async/Await

Async/Await (modern Python):

async def fetch_data():
    result = await http_client.get(url)
    return result

result = await fetch_data()  # Ждём одно значение

Observable (для потоков данных):

def subscribe(observer, scheduler=None):
    async def fetch_loop():
        for i in range(10):
            data = await http_client.get(f"{url}/{i}")
            observer.on_next(data)
            await asyncio.sleep(1)
    
    asyncio.create_task(fetch_loop())

observable = create(subscribe)
observable.subscribe(lambda data: print(data))

Когда использовать Observable

  • Real-time данные: цены акций, сообщения в чате
  • User-generated события: клики, скролл, ввод текста
  • Потокующие API: WebSockets, Server-Sent Events
  • Сложная обработка данных: фильтрация, трансформация, объединение нескольких источников

Hot vs Cold Observable

Cold Observable — создаёт новый поток для каждого подписчика:

cold = create(lambda observer, _: observer.on_next(1))
cold.subscribe(print)  # Эта подписка получает значение
cold.subscribe(print)  # Эта подписка получает своё значение

Hot Observable — одна подписка для всех наблюдателей:

hot = subject.Subject()
hot.subscribe(print)  # Подписка 1
hot.subscribe(print)  # Подписка 2
hot.on_next(1)  # Обе подписки получают значение

Observable — мощный инструмент для работы с асинхронными потоками данных и построения реактивных систем.

Что такое Observable? | PrepBro