Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Что такое Event Sourcing?
Event Sourcing — это архитектурный паттерн, в котором все изменения состояния приложения представляются как последовательность неизменяемых событий, а не только сохранением текущего состояния. Вместо перезаписи текущих данных система записывает каждое событие (действие), которое привело к изменению состояния.
Основной принцип
Вместо традиционного подхода:
Ольга: баланс 1000 руб → баланс 900 руб (прямое обновление)
Event Sourcing использует:
Ольга: ОК, баланс начальный 1000 руб
Ольга: СОБЫТИЕ — Перевод 100 руб (баланс = 1000 - 100 = 900 руб)
Ольга: СОБЫТИЕ — Пополнение на 50 руб (баланс = 900 + 50 = 950 руб)
Компоненты Event Sourcing
1. Event Store
Отдельное хранилище для всех событий. Это источник истины для всей системы.
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any
class EventType(Enum):
ACCOUNT_CREATED = 'account_created'
MONEY_TRANSFERRED = 'money_transferred'
BALANCE_UPDATED = 'balance_updated'
@dataclass
class Event:
event_id: str
event_type: EventType
aggregate_id: str # ID пользователя или счета
timestamp: datetime
data: dict[str, Any]
version: int # Версия для оптимистичной блокировки
class EventStore:
def __init__(self):
self.events: list[Event] = []
def append_event(self, event: Event) -> None:
"""Добавить событие в хранилище"""
self.events.append(event)
def get_events_by_aggregate(self, aggregate_id: str) -> list[Event]:
"""Получить все события для конкретного агрегата"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
2. Aggregate (Агрегат)
Это объект, состояние которого управляется событиями. Агрегат воспроизводит свое состояние, применяя события в порядке их возникновения.
@dataclass
class Account:
account_id: str
balance: float = 0.0
owner: str = ""
version: int = 0
def apply_event(self, event: Event) -> None:
"""Применить событие к состоянию"""
if event.event_type == EventType.ACCOUNT_CREATED:
self.account_id = event.data['account_id']
self.owner = event.data['owner']
self.balance = event.data['initial_balance']
elif event.event_type == EventType.MONEY_TRANSFERRED:
self.balance += event.data['amount']
self.version = event.version
def rebuild_from_events(self, events: list[Event]) -> None:
"""Восстановить состояние из событий"""
for event in events:
self.apply_event(event)
3. Projection (Проекция)
Оптимизированное представление данных для чтения. Построено из событий, но может быть переиндексировано.
class AccountProjection:
"""Денормализованное представление счета для быстрого чтения"""
def __init__(self):
self.accounts: dict[str, dict] = {}
def handle_event(self, event: Event) -> None:
if event.event_type == EventType.ACCOUNT_CREATED:
self.accounts[event.aggregate_id] = {
'owner': event.data['owner'],
'balance': event.data['initial_balance'],
'created_at': event.timestamp
}
elif event.event_type == EventType.MONEY_TRANSFERRED:
if event.aggregate_id in self.accounts:
self.accounts[event.aggregate_id]['balance'] += event.data['amount']
def get_account(self, account_id: str) -> dict:
return self.accounts.get(account_id)
Полный пример
from uuid import uuid4
class AccountService:
def __init__(self, event_store: EventStore, projection: AccountProjection):
self.event_store = event_store
self.projection = projection
def create_account(self, owner: str, initial_balance: float) -> str:
account_id = str(uuid4())
event = Event(
event_id=str(uuid4()),
event_type=EventType.ACCOUNT_CREATED,
aggregate_id=account_id,
timestamp=datetime.now(),
data={
'account_id': account_id,
'owner': owner,
'initial_balance': initial_balance
},
version=1
)
self.event_store.append_event(event)
self.projection.handle_event(event)
return account_id
def transfer_money(self, account_id: str, amount: float) -> None:
# Получить текущее состояние
events = self.event_store.get_events_by_aggregate(account_id)
account = Account(account_id)
account.rebuild_from_events(events)
# Создать новое событие
event = Event(
event_id=str(uuid4()),
event_type=EventType.MONEY_TRANSFERRED,
aggregate_id=account_id,
timestamp=datetime.now(),
data={'amount': amount},
version=account.version + 1
)
self.event_store.append_event(event)
self.projection.handle_event(event)
def get_balance(self, account_id: str) -> float:
account = self.projection.get_account(account_id)
return account['balance'] if account else 0.0
Преимущества Event Sourcing
- Полная история — все изменения записаны и неизменяемы
- Отладка — можно воспроизвести любое состояние
- Аудит — соответствие требованиям compliance
- Масштабируемость — отделение чтения (проекции) от записи (события)
- Временная машина — возможность вернуться в прошлое
Недостатки
- Сложность — требует иного мышления о хранении данных
- Объем хранилища — события растут с течением времени
- Консистентность — проекции могут отставать (eventual consistency)
- CQRS — часто требует разделения команд и запросов
Event Sourcing + CQRS
Они часто используются вместе: Event Sourcing обрабатывает команды (write), CQRS разделяет модель чтения (read) и записи.
Event Sourcing — мощный паттерн для систем, требующих полной истории изменений и высокой масштабируемости.