Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Observable
Observable — это концепция из reactive programming, которая представляет поток данных, который может быть подписан множеством наблюдателей. Observable испускает значения со временем, и подписчики реагируют на эти события.
Основная идея
Observable — это обобщение итератора на асинхронные случаи:
# Итератор — синхронный поток
for value in range(5):
print(value) # 0, 1, 2, 3, 4
# Observable — асинхронный поток
# (значения приходят в произвольные моменты времени)
from rxpy import create
def on_subscribe(observer, scheduler=None):
observer.on_next(0)
observer.on_next(1)
observer.on_next(2)
observer.on_complete()
observable = create(on_subscribe)
observable.subscribe(on_next=print) # 0, 1, 2
Реактивное программирование в Python
RxPY — самая популярная библиотека для Reactive Programming в Python:
from rxpy import create, interval
import rxpy.operators as ops
# Создание Observable
def subscribe(observer, scheduler=None):
observer.on_next("Hello")
observer.on_next("World")
observer.on_complete()
observable = create(subscribe)
# Подписка на Observable
observable.subscribe(
on_next=lambda x: print(f"Значение: {x}"),
on_error=lambda e: print(f"Ошибка: {e}"),
on_completed=lambda: print("Завершено")
)
Отличие от Promises/Futures
Обычный Future (одно значение):
from concurrent.futures import ThreadPoolExecutor
def fetch_data():
return "результат"
with ThreadPoolExecutor() as executor:
future = executor.submit(fetch_data)
result = future.result() # Ждём одно значение
Observable (поток значений):
from rxpy import interval
# Испускает значение каждую секунду, бесконечно
observable = interval(1.0)
observable.subscribe(lambda x: print(f"Значение: {x}"))
# 0, 1, 2, 3, 4, ... (продолжается неопределённо долго)
Операции над Observable
map — трансформирует значения:
from rxpy import of
import rxpy.operators as ops
of(1, 2, 3).pipe(
ops.map(lambda x: x * 2)
).subscribe(print) # 2, 4, 6
filter — отфильтровывает значения:
of(1, 2, 3, 4, 5).pipe(
ops.filter(lambda x: x > 2)
).subscribe(print) # 3, 4, 5
merge — объединяет несколько Observable:
from rxpy import of, merge
observable1 = of(1, 2, 3)
observable2 = of(4, 5, 6)
merge(observable1, observable2).subscribe(print)
# 1, 2, 3, 4, 5, 6
flatMap — для асинхронных операций:
from rxpy import interval
import rxpy.operators as ops
# Каждое значение от interval запускает HTTP запрос
interval(2.0).pipe(
ops.flat_map(lambda i: fetch_user_async(i))
).subscribe(print)
Практический пример: обработка кликов
from rxpy import create
import rxpy.operators as ops
# Создаём Observable из кликов
def subscribe(observer, scheduler=None):
def on_click():
observer.on_next({"x": 100, "y": 200})
# Предположим, мы регистрируем обработчик события
element.addEventListener("click", on_click)
click_observable = create(subscribe)
# Фильтруем двойные клики
click_observable.pipe(
ops.buffer_count(2),
ops.filter(lambda clicks: clicks[1]["time"] - clicks[0]["time"] < 0.3),
ops.map(lambda clicks: clicks[1])
).subscribe(
on_next=lambda click: print(f"Двойной клик: {click}")
)
Observable vs Async/Await
Async/Await (modern Python):
async def fetch_data():
result = await http_client.get(url)
return result
result = await fetch_data() # Ждём одно значение
Observable (для потоков данных):
def subscribe(observer, scheduler=None):
async def fetch_loop():
for i in range(10):
data = await http_client.get(f"{url}/{i}")
observer.on_next(data)
await asyncio.sleep(1)
asyncio.create_task(fetch_loop())
observable = create(subscribe)
observable.subscribe(lambda data: print(data))
Когда использовать Observable
- Real-time данные: цены акций, сообщения в чате
- User-generated события: клики, скролл, ввод текста
- Потокующие API: WebSockets, Server-Sent Events
- Сложная обработка данных: фильтрация, трансформация, объединение нескольких источников
Hot vs Cold Observable
Cold Observable — создаёт новый поток для каждого подписчика:
cold = create(lambda observer, _: observer.on_next(1))
cold.subscribe(print) # Эта подписка получает значение
cold.subscribe(print) # Эта подписка получает своё значение
Hot Observable — одна подписка для всех наблюдателей:
hot = subject.Subject()
hot.subscribe(print) # Подписка 1
hot.subscribe(print) # Подписка 2
hot.on_next(1) # Обе подписки получают значение
Observable — мощный инструмент для работы с асинхронными потоками данных и построения реактивных систем.