← Назад к вопросам

Какие плюсы и минусы Observer?

2.0 Middle🔥 131 комментариев
#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Observer (Наблюдатель) — плюсы и минусы

Что такое Observer?

Observer — паттерн проектирования поведенческого типа, который устанавливает зависимость один-ко-многим между объектами таким образом, что при изменении состояния одного объекта все зависящие от него объекты уведомляются об этом и автоматически обновляются.

Структура паттерна

from abc import ABC, abstractmethod
from typing import List

# Subject (издатель событий)
class Subject:
    def __init__(self):
        self._observers: List[Observer] = []
        self._state = None
    
    def attach(self, observer: 'Observer'):
        """Подписать наблюдателя"""
        if observer not in self._observers:
            self._observers.append(observer)
    
    def detach(self, observer: 'Observer'):
        """Отписать наблюдателя"""
        if observer in self._observers:
            self._observers.remove(observer)
    
    def notify(self):
        """Уведомить всех наблюдателей"""
        for observer in self._observers:
            observer.update(self)
    
    @property
    def state(self):
        return self._state
    
    @state.setter
    def state(self, value):
        self._state = value
        self.notify()

# Observer (наблюдатель)
class Observer(ABC):
    @abstractmethod
    def update(self, subject: Subject):
        pass

# Конкретные наблюдатели
class ConcreteObserverA(Observer):
    def update(self, subject: Subject):
        if subject.state < 3:
            print(f'ConcreteObserverA реагирует на новое состояние: {subject.state}')

class ConcreteObserverB(Observer):
    def update(self, subject: Subject):
        if subject.state >= 3:
            print(f'ConcreteObserverB реагирует на новое состояние: {subject.state}')

# Использование
subject = Subject()
observerA = ConcreteObserverA()
observerB = ConcreteObserverB()

subject.attach(observerA)
subject.attach(observerB)

subject.state = 2  # Уведомит только A
subject.state = 5  # Уведомит только B

Практический пример: система уведомлений

class NewsPublisher:
    """Издатель новостей"""
    def __init__(self):
        self._subscribers = []
    
    def subscribe(self, subscriber: 'Subscriber'):
        self._subscribers.append(subscriber)
    
    def unsubscribe(self, subscriber: 'Subscriber'):
        self._subscribers.remove(subscriber)
    
    def publish(self, news: str):
        for subscriber in self._subscribers:
            subscriber.notify(news)

class Subscriber:
    """Подписчик на новости"""
    def __init__(self, name: str):
        self.name = name
    
    def notify(self, news: str):
        print(f'{self.name} получил новость: {news}')

# Использование
publisher = NewsPublisher()
alice = Subscriber('Alice')
bob = Subscriber('Bob')

publisher.subscribe(alice)
publisher.subscribe(bob)

publisher.publish('Python 3.12 вышел!')  # Оба получат
publisher.unsubscribe(alice)
publisher.publish('Django 5.0 готов!')  # Только Bob

Реальный пример: реактивное программирование (RxPY)

import rx
from rx import operators as ops

# Создание потока событий
button_clicks = rx.Subject()

# Подписка с трансформацией
button_clicks.pipe(
    ops.map(lambda x: x * 2),
    ops.filter(lambda x: x > 5)
).subscribe(
    on_next=lambda x: print(f'Обработано: {x}'),
    on_error=lambda e: print(f'Ошибка: {e}'),
    on_completed=lambda: print('Завершено')
)

# Генерирование событий
button_clicks.on_next(3)  # 3 * 2 = 6 > 5, выведет
button_clicks.on_next(2)  # 2 * 2 = 4 <= 5, не выведет
button_clicks.on_completed()

Плюсы Observer

1. Слабая связанность (Loose Coupling)

# Subject не знает конкретных деталей Observer
# Они связаны только через интерфейс update()
class NewsPublisher:  # Subject не знает, кто его слушает
    def publish(self, news):
        for observer in self._observers:
            observer.update(news)  # Абстрактный вызов

# Observer может быть любым
class EmailNotifier(Observer):
    def update(self, news):
        self.send_email(news)

class SMSNotifier(Observer):
    def update(self, news):
        self.send_sms(news)

class SlackNotifier(Observer):
    def update(self, news):
        self.send_slack(news)

# Легко добавлять новые способы уведомления

Преимущество: можешь добавлять новых наблюдателей без изменения издателя.

2. Динамические отношения

# Можно подписывать/отписывать на лету
signal = Signal()

observer1 = Observer1()
observer2 = Observer2()

signal.attach(observer1)
signal.notify()  # observer1 получит

signal.attach(observer2)
signal.notify()  # оба получат

signal.detach(observer1)
signal.notify()  # только observer2 получит

Преимущество: максимальная гибкость в управлении зависимостями.

3. Поддержка broadcast коммуникации

# Один издатель -> многие подписчики
event_bus = EventBus()

for i in range(1000):
    event_bus.subscribe(ListenerI())

event_bus.publish(Event())  # Все получат одновременно

Преимущество: естественное моделирование систем типа event bus, pub-sub.

4. Разделение ответственности

# UserService (Subject) отвечает за пользователей
class UserService:
    def create_user(self, user_data):
        user = User(**user_data)
        self.notify()  # Просто уведомляет
        return user

# Логирование, отправка почты, аналитика — это Observer
class AnalyticsObserver(Observer):
    def update(self):
        track_event('user_created')

class EmailObserver(Observer):
    def update(self):
        send_welcome_email()

class LoggingObserver(Observer):
    def update(self):
        log_event('User created')

Преимущество: каждый компонент отвечает за свою работу.

Минусы Observer

1. Неопределённый порядок уведомления

subject = Subject()

# Порядок подписки == порядок уведомления
observer1 = Observer1()
observer2 = Observer2()
observer3 = Observer3()

subject.attach(observer1)
subject.attach(observer2)
subject.attach(observer3)

subject.notify()  # observer1 -> observer2 -> observer3

# Но этот порядок — деталь реализации!
# Если observer1 выбросит исключение, observer2 и observer3 не будут уведомлены

Проблема: трудно отслеживать порядок выполнения. Если observer1 зависит от observer2, будут баги.

Решение:

# Использовать безопасное уведомление
def notify(self):
    for observer in self._observers:
        try:
            observer.update(self)
        except Exception as e:
            logger.error(f'Observer {observer} failed: {e}')
            # Продолжить уведомление остальных

2. Скрытые зависимости

class UserService:
    def __init__(self):
        self._observers = []  # Скрытые зависимости!
    
    def create_user(self, user_data):
        user = User(**user_data)
        self.notify()  # Что произойдёт здесь?
        # Кто подписан? Какие действия будут выполнены?
        # Это не видно из кода!
        return user

# Где-то в другом модуле:
user_service.attach(analytics_observer)
user_service.attach(email_observer)
user_service.attach(webhook_observer)
# Создатель User не видит эти побочные эффекты

Проблема: трудно отследить, что происходит при уведомлении. Побочные эффекты скрыты.

Решение:

# Явно определить слушателей в конфигурации
USER_CREATION_LISTENERS = [
    EmailObserver(),
    AnalyticsObserver(),
    WebhookObserver(),
]

# Или использовать event-driven архитектуру
from dataclasses import dataclass

@dataclass
class UserCreated(Event):
    user_id: str
    user_data: dict

# Явный event bus
class EventBus:
    def emit(self, event: Event):
        listeners = self.get_listeners(type(event))
        for listener in listeners:
            listener.handle(event)

3. Производительность при большом количестве подписчиков

import time

subject = Subject()

# 10000 подписчиков
for i in range(10000):
    subject.attach(Observer())

start = time.time()
subject.notify()  # Уведомить всех
end = time.time()

print(f'Время уведомления: {end - start:.3f} сек')
# Может быть медленным!

Проблема: если наблюдателей много, уведомление может быть медленным и блокирующим.

Решение:

# 1. Асинхронное уведомление
import asyncio

async def async_notify(self):
    tasks = [observer.update(self) for observer in self._observers]
    await asyncio.gather(*tasks)

# 2. Использовать очереди
from queue import Queue

notification_queue = Queue()

def notify(self):
    notification_queue.put(('notify', self._state))

# Отдельный worker обрабатывает уведомления
def worker():
    while True:
        event_type, data = notification_queue.get()
        for observer in self._observers:
            observer.update(data)

4. Утечки памяти

class DataSource:
    def __init__(self):
        self._observers = []
    
    def attach(self, observer):
        self._observers.append(observer)

source = DataSource()

class Observer:
    def __init__(self, heavy_data):
        self.heavy_data = heavy_data
        self.size = sys.getsizeof(heavy_data)

# Создали наблюдателя и забыли отписать
observer = Observer(large_list)  # Займёт 100MB памяти
source.attach(observer)
del observer  # Observer всё ещё живёт в source._observers!

# Память не освободится, пока source не будет удален

Проблема: если забыть отписать наблюдателя, памяль будет утечка.

Решение:

from weakref import WeakSet

class Subject:
    def __init__(self):
        self._observers = WeakSet()  # Слабые ссылки
    
    def attach(self, observer):
        self._observers.add(observer)  # Автоматически удалится если observer удален

# Или явно отписывать
try:
    # Использование observer
finally:
    source.detach(observer)

5. Сложность отладки

# Очень сложно отследить причину изменения состояния
class User:
    def __init__(self):
        self._age = 0
        self._observers = []
    
    @property
    def age(self):
        return self._age
    
    @age.setter
    def age(self, value):
        self._age = value
        self.notify()  # Что произойдёт?

user = User()
user.attach(observer1)
user.attach(observer2)
user.attach(observer3)
user.attach(observer4)
user.attach(observer5)

user.age = 30  # Вызовется 5 обработчиков
# Где-то в одном из них произойдёт ошибка
# Трудно отследить какая из 5 операций её вызвала

Проблема: при множественных наблюдателях сложно отладить, кто и что вызвал.

Решение:

# Логирование
class Subject:
    def notify(self):
        for observer in self._observers:
            try:
                print(f'Вызов {observer}')
                observer.update(self)
                print(f'Успех {observer}')
            except Exception as e:
                print(f'Ошибка в {observer}: {e}')
                raise

Когда использовать Observer?

Хорошо подходит для:

  • Event-driven системы (UI события, горячие клавиши)
  • Pub-sub архитектуры (message brokers)
  • Real-time обновления данных
  • Системы мониторинга и аналитики
  • Model-View паттерны (MVC, MVVM)

Избегай для:

  • Простых зависимостей между объектами
  • Когда есть чёткий порядок обработки (используй цепочку)
  • Когда нужна синхронизация (используй события и очереди)

Альтернативы

  1. Event Bus — централизованная система событий
  2. Reactive Programming (RxPY, ReactiveX)
  3. Message Queues (RabbitMQ, Kafka)
  4. Signals (Django signals, PyPubSub)
  5. Callback functions для простых случаев
Какие плюсы и минусы Observer? | PrepBro