Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое принцип EDA?
EDA (Event-Driven Architecture) — это архитектурный паттерн, где система реагирует на события вместо вызова функций напрямую. Компоненты взаимодействуют через события, а не через прямые вызовы, что обеспечивает слабую связанность и высокую масштабируемость.
Основная идея
Вместо того чтобы функции вызывали друг друга напрямую:
# Плохо — тесная связанность
user_service.create_user(data)
email_service.send_welcome_email(user) # Тесно связано
analytics_service.track_signup(user) # Тесно связано
С EDA события распространяются через систему:
# Хорошо — свободная связанность
event_bus.emit('user:created', user_data)
# Кто-то слушает и реагирует
# - email_service отправляет письмо
# - analytics_service логирует
# - notification_service отправляет push
Компоненты EDA
- Event Producer — генерирует события
- Event Broker/Bus — транспортирует события
- Event Consumer/Handler — обрабатывает события
- Event — данные о том, что произошло
Простой пример: система заказов
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime
class EventType(Enum):
ORDER_CREATED = "order:created"
PAYMENT_PROCESSED = "payment:processed"
ORDER_SHIPPED = "order:shipped"
ORDER_DELIVERED = "order:delivered"
@dataclass
class Event:
type: EventType
timestamp: datetime
data: dict
class EventBus:
"""Простой Event Bus"""
def __init__(self):
self._subscribers: Dict[EventType, List[Callable]] = {}
def subscribe(self, event_type: EventType, handler: Callable):
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
def emit(self, event: Event):
if event.type in self._subscribers:
for handler in self._subscribers[event.type]:
try:
handler(event)
except Exception as e:
print(f"Error in handler: {e}")
# Event Producers (генераторы событий)
class OrderService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.orders = {}
def create_order(self, order_id: str, items: list, customer_id: str):
order = {
'id': order_id,
'items': items,
'customer_id': customer_id,
'status': 'pending'
}
self.orders[order_id] = order
# Генерируем событие
event = Event(
type=EventType.ORDER_CREATED,
timestamp=datetime.now(),
data={'order_id': order_id, 'items': items, 'customer': customer_id}
)
self.event_bus.emit(event)
return order
# Event Consumers (обработчики событий)
class PaymentService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
# Подписываемся на событие ORDER_CREATED
self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
def on_order_created(self, event: Event):
order_id = event.data['order_id']
print(f"PaymentService: Обработка платежа для заказа {order_id}")
# Обработали платёж
processed_event = Event(
type=EventType.PAYMENT_PROCESSED,
timestamp=datetime.now(),
data={'order_id': order_id, 'status': 'success'}
)
self.event_bus.emit(processed_event)
class NotificationService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
self.event_bus.subscribe(EventType.PAYMENT_PROCESSED, self.on_payment_processed)
def on_order_created(self, event: Event):
customer_id = event.data['customer']
print(f"NotificationService: Отправляем приветствие клиенту {customer_id}")
def on_payment_processed(self, event: Event):
order_id = event.data['order_id']
print(f"NotificationService: Отправляем подтверждение платежа для заказа {order_id}")
class AnalyticsService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.event_bus.subscribe(EventType.ORDER_CREATED, self.on_order_created)
def on_order_created(self, event: Event):
print(f"AnalyticsService: Логируем создание заказа в Mixpanel")
# Использование
bus = EventBus()
order_service = OrderService(bus)
payment_service = PaymentService(bus)
notification_service = NotificationService(bus)
analytics_service = AnalyticsService(bus)
# Создаём заказ — автоматически срабатывают все handlers
order_service.create_order('order-1', ['item1', 'item2'], 'customer-1')
Синхронная vs Асинхронная EDA
Синхронная (простая, но медленнее):
# Все handlers вызываются по очереди
event_bus.emit(event)
# Ждём, пока все обработчики завершат работу
Асинхронная (быстрее, но сложнее):
import asyncio
class AsyncEventBus:
def __init__(self):
self._subscribers = {}
def subscribe(self, event_type: EventType, handler):
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
async def emit(self, event: Event):
if event.type in self._subscribers:
# Запускаем все handler'ы параллельно
tasks = [
handler(event)
for handler in self._subscribers[event.type]
]
await asyncio.gather(*tasks)
class AsyncPaymentService:
async def on_order_created(self, event: Event):
print(f"Обработка платежа...")
await asyncio.sleep(2) # Имитируем долгую операцию
print(f"Платёж обработан")
Реальный пример: e-commerce с RabbitMQ
import pika
import json
from typing import Callable
class RabbitMQEventBus:
"""Event Bus на основе RabbitMQ"""
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host)
)
self.channel = self.connection.channel()
def emit(self, event_type: str, data: dict):
"""Опубликовать событие"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
'type': event_type,
'data': data
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message
)
print(f"Evento опубликовано: {event_type}")
def subscribe(self, event_type: str, callback: Callable):
"""Подписаться на событие"""
queue = self.channel.queue_declare(
queue='', # Auto-generated queue
exclusive=True
)
self.channel.queue_bind(
exchange='events',
queue=queue.method.queue,
routing_key=event_type
)
def on_message(ch, method, properties, body):
event_data = json.loads(body)
callback(event_data)
self.channel.basic_consume(
queue=queue.method.queue,
on_message_callback=on_message
)
print(f"Подписались на: {event_type}")
self.channel.start_consuming()
# Использование
bus = RabbitMQEventBus()
# Producer
bus.emit('order:created', {'order_id': '123', 'amount': 99.99})
# Consumer
def handle_order_created(event):
print(f"Заказ создан: {event['data']['order_id']}")
bus.subscribe('order:created', handle_order_created)
Паттерны EDA
1. Saga Pattern — управление долгоживущими транзакциями через события:
# Вместо одной большой транзакции
order -> payment -> inventory -> shipping
# Используем события и компенсирующие транзакции
order_created -> payment_failed -> order_cancelled
2. Event Sourcing — сохранять все события вместо состояния:
# Вместо: state = {'status': 'shipped'}
# Сохраняем: [
# {'type': 'order_created', 'data': {...}},
# {'type': 'payment_processed', 'data': {...}},
# {'type': 'order_shipped', 'data': {...}}
# ]
3. CQRS (Command Query Responsibility Segregation):
# Команды создают события
CreateOrderCommand -> OrderCreatedEvent
# События обновляют read-модели
OrderCreatedEvent -> обновляем OrderListView
Преимущества и недостатки EDA
✅ Преимущества:
- Слабая связанность между компонентами
- Масштабируемость — легко добавлять новые handlers
- Асинхронность — улучшает производительность
- Реактивность — система быстро реагирует на события
❌ Недостатки:
- Сложность отладки — сложнее трассировать поток
- Консистентность данных — harder to ensure
- Порядок событий — может быть не гарантирован
- Memory overhead — много объектов Event'ов
Когда использовать EDA
- Микросервисная архитектура
- Real-time системы (чаты, уведомления)
- Системы с множеством асинхронных операций
- Event sourcing приложения
- IoT системы (множество датчиков генерируют события)
Инструменты
- Message Brokers: RabbitMQ, Kafka, Redis, AWS SNS/SQS
- Python библиотеки: Pydantic Events, aiokafka, pika
- Frameworks: FastAPI с background tasks, Celery
Заключение
EDA — это архитектурный паттерн для:
- Слабой связанности компонентов
- Масштабируемых и асинхронных систем
- Реактивного взаимодействия между сервисами
- Управления сложными бизнес-процессами
Это один из самых важных паттернов современной архитектуры.