← Назад к вопросам

Что такое принцип EDA?

2.2 Middle🔥 141 комментариев
#Другое

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Что такое принцип EDA?

EDA (Event-Driven Architecture) — это архитектурный паттерн, где система реагирует на события вместо вызова функций напрямую. Компоненты взаимодействуют через события, а не через прямые вызовы, что обеспечивает слабую связанность и высокую масштабируемость.

Основная идея

Вместо того чтобы функции вызывали друг друга напрямую:

# Плохо — тесная связанность
user_service.create_user(data)
email_service.send_welcome_email(user)  # Тесно связано
analytics_service.track_signup(user)    # Тесно связано

С EDA события распространяются через систему:

# Хорошо — свободная связанность
event_bus.emit('user:created', user_data)
# Кто-то слушает и реагирует
# - email_service отправляет письмо
# - analytics_service логирует
# - notification_service отправляет push

Компоненты EDA

  1. Event Producer — генерирует события
  2. Event Broker/Bus — транспортирует события
  3. Event Consumer/Handler — обрабатывает события
  4. Event — данные о том, что произошло

Простой пример: система заказов

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime

class EventType(Enum):
    ORDER_CREATED = "order:created"
    PAYMENT_PROCESSED = "payment:processed"
    ORDER_SHIPPED = "order:shipped"
    ORDER_DELIVERED = "order:delivered"

@dataclass
class Event:
    type: EventType
    timestamp: datetime
    data: dict

class EventBus:
    """Простой Event Bus"""
    def __init__(self):
        self._subscribers: Dict[EventType, List[Callable]] = {}
    
    def subscribe(self, event_type: EventType, handler: Callable):
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler)
    
    def emit(self, event: Event):
        if event.type in self._subscribers:
            for handler in self._subscribers[event.type]:
                try:
                    handler(event)
                except Exception as e:
                    print(f"Error in handler: {e}")

# Event Producers (генераторы событий)
class OrderService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.orders = {}
    
    def create_order(self, order_id: str, items: list, customer_id: str):
        order = {
            'id': order_id,
            'items': items,
            'customer_id': customer_id,
            'status': 'pending'
        }
        self.orders[order_id] = order
        
        # Генерируем событие
        event = Event(
            type=EventType.ORDER_CREATED,
            timestamp=datetime.now(),
            data={'order_id': order_id, 'items': items, 'customer': customer_id}
        )
        self.event_bus.emit(event)
        return order

# Event Consumers (обработчики событий)
class PaymentService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        # Подписываемся на событие ORDER_CREATED
        self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
    
    def on_order_created(self, event: Event):
        order_id = event.data['order_id']
        print(f"PaymentService: Обработка платежа для заказа {order_id}")
        
        # Обработали платёж
        processed_event = Event(
            type=EventType.PAYMENT_PROCESSED,
            timestamp=datetime.now(),
            data={'order_id': order_id, 'status': 'success'}
        )
        self.event_bus.emit(processed_event)

class NotificationService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
        self.event_bus.subscribe(EventType.PAYMENT_PROCESSED, self.on_payment_processed)
    
    def on_order_created(self, event: Event):
        customer_id = event.data['customer']
        print(f"NotificationService: Отправляем приветствие клиенту {customer_id}")
    
    def on_payment_processed(self, event: Event):
        order_id = event.data['order_id']
        print(f"NotificationService: Отправляем подтверждение платежа для заказа {order_id}")

class AnalyticsService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
    
    def on_order_created(self, event: Event):
        print(f"AnalyticsService: Логируем создание заказа в Mixpanel")

# Использование
bus = EventBus()
order_service = OrderService(bus)
payment_service = PaymentService(bus)
notification_service = NotificationService(bus)
analytics_service = AnalyticsService(bus)

# Создаём заказ — автоматически срабатывают все handlers
order_service.create_order('order-1', ['item1', 'item2'], 'customer-1')

Синхронная vs Асинхронная EDA

Синхронная (простая, но медленнее):

# Все handlers вызываются по очереди
event_bus.emit(event)
# Ждём, пока все обработчики завершат работу

Асинхронная (быстрее, но сложнее):

import asyncio

class AsyncEventBus:
    def __init__(self):
        self._subscribers = {}
    
    def subscribe(self, event_type: EventType, handler):
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        self._subscribers[event_type].append(handler)
    
    async def emit(self, event: Event):
        if event.type in self._subscribers:
            # Запускаем все handler'ы параллельно
            tasks = [
                handler(event)
                for handler in self._subscribers[event.type]
            ]
            await asyncio.gather(*tasks)

class AsyncPaymentService:
    async def on_order_created(self, event: Event):
        print(f"Обработка платежа...")
        await asyncio.sleep(2)  # Имитируем долгую операцию
        print(f"Платёж обработан")

Реальный пример: e-commerce с RabbitMQ

import pika
import json
from typing import Callable

class RabbitMQEventBus:
    """Event Bus на основе RabbitMQ"""
    def __init__(self, host='localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()
    
    def emit(self, event_type: str, data: dict):
        """Опубликовать событие"""
        self.channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )
        
        message = json.dumps({
            'type': event_type,
            'data': data
        })
        
        self.channel.basic_publish(
            exchange='events',
            routing_key=event_type,
            body=message
        )
        print(f"Evento опубликовано: {event_type}")
    
    def subscribe(self, event_type: str, callback: Callable):
        """Подписаться на событие"""
        queue = self.channel.queue_declare(
            queue='',  # Auto-generated queue
            exclusive=True
        )
        
        self.channel.queue_bind(
            exchange='events',
            queue=queue.method.queue,
            routing_key=event_type
        )
        
        def on_message(ch, method, properties, body):
            event_data = json.loads(body)
            callback(event_data)
        
        self.channel.basic_consume(
            queue=queue.method.queue,
            on_message_callback=on_message
        )
        
        print(f"Подписались на: {event_type}")
        self.channel.start_consuming()

# Использование
bus = RabbitMQEventBus()

# Producer
bus.emit('order:created', {'order_id': '123', 'amount': 99.99})

# Consumer
def handle_order_created(event):
    print(f"Заказ создан: {event['data']['order_id']}")

bus.subscribe('order:created', handle_order_created)

Паттерны EDA

1. Saga Pattern — управление долгоживущими транзакциями через события:

# Вместо одной большой транзакции
order -> payment -> inventory -> shipping

# Используем события и компенсирующие транзакции
order_created -> payment_failed -> order_cancelled

2. Event Sourcing — сохранять все события вместо состояния:

# Вместо: state = {'status': 'shipped'}
# Сохраняем: [
#   {'type': 'order_created', 'data': {...}},
#   {'type': 'payment_processed', 'data': {...}},
#   {'type': 'order_shipped', 'data': {...}}
# ]

3. CQRS (Command Query Responsibility Segregation):

# Команды создают события
CreateOrderCommand -> OrderCreatedEvent

# События обновляют read-модели
OrderCreatedEvent -> обновляем OrderListView

Преимущества и недостатки EDA

Преимущества:

  • Слабая связанность между компонентами
  • Масштабируемость — легко добавлять новые handlers
  • Асинхронность — улучшает производительность
  • Реактивность — система быстро реагирует на события

Недостатки:

  • Сложность отладки — сложнее трассировать поток
  • Консистентность данных — harder to ensure
  • Порядок событий — может быть не гарантирован
  • Memory overhead — много объектов Event'ов

Когда использовать EDA

  • Микросервисная архитектура
  • Real-time системы (чаты, уведомления)
  • Системы с множеством асинхронных операций
  • Event sourcing приложения
  • IoT системы (множество датчиков генерируют события)

Инструменты

  • Message Brokers: RabbitMQ, Kafka, Redis, AWS SNS/SQS
  • Python библиотеки: Pydantic Events, aiokafka, pika
  • Frameworks: FastAPI с background tasks, Celery

Заключение

EDA — это архитектурный паттерн для:

  • Слабой связанности компонентов
  • Масштабируемых и асинхронных систем
  • Реактивного взаимодействия между сервисами
  • Управления сложными бизнес-процессами

Это один из самых важных паттернов современной архитектуры.

Что такое принцип EDA? | PrepBro