Какие знаешь паттерны построения микросервисов?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Паттерны построения микросервисов
Микросервисная архитектура — это один из самых важных паттернов в современной разработке. За мой опыт я работал с несколькими ключевыми подходами.
1. Паттерн "Sagа"
Для управления распределёнными транзакциями между микросервисами используется паттерн Saga. Есть две реализации:
Choreography — каждый сервис слушает события и сам решает, что делать:
# Сервис заказов публикует событие
order_created_event = OrderCreatedEvent(order_id=123, user_id=456)
event_bus.publish("order.created", order_created_event)
# Сервис платежей слушает и обрабатывает
@event_bus.on("order.created")
async def process_payment(event):
await payment_service.charge(event.user_id, event.amount)
event_bus.publish("payment.processed", {"order_id": event.order_id})
Orchestration — центральный оркестратор управляет всеми шагами:
class OrderSaga:
def __init__(self, services):
self.order_service = services["order"]
self.payment_service = services["payment"]
self.inventory_service = services["inventory"]
async def execute(self, order):
try:
order_id = await self.order_service.create(order)
await self.payment_service.charge(order.user_id, order.amount)
await self.inventory_service.reserve(order.items)
return order_id
except Exception:
# компенсирующие транзакции
await self.payment_service.refund(order.user_id, order.amount)
await self.inventory_service.release(order.items)
2. API Gateway паттерн
Единая точка входа для всех клиентов:
# gateway.py
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
SERVICES = {
"users": "http://users-service:8001",
"orders": "http://orders-service:8002",
"products": "http://products-service:8003",
}
@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(f"{SERVICES[users]}/users/{user_id}")
return response.json()
@app.post("/api/v1/orders")
async def create_order(order_data: dict):
async with httpx.AsyncClient() as client:
response = await client.post(f"{SERVICES[orders]}/orders", json=order_data)
return response.json()
3. Service Mesh паттерн
Используется Istio или Linkerd для управления сетевым трафиком между сервисами. Добавляется прозрачно без изменений в коде приложения:
# istio-config.yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: orders
spec:
hosts:
- orders
http:
- match:
- uri:
prefix: /api/v1
route:
- destination:
host: orders
port:
number: 8000
timeout: 10s
retries:
attempts: 3
perTryTimeout: 2s
4. Event Sourcing
Все изменения состояния хранятся как последовательность событий:
class Event:
def __init__(self, event_type: str, data: dict, timestamp: datetime):
self.event_type = event_type
self.data = data
self.timestamp = timestamp
class EventStore:
def __init__(self, db_connection):
self.db = db_connection
async def append(self, aggregate_id: str, event: Event):
await self.db.execute(
"INSERT INTO events (aggregate_id, event_type, data, timestamp) VALUES (%s, %s, %s, %s)",
(aggregate_id, event.event_type, json.dumps(event.data), event.timestamp)
)
async def get_events(self, aggregate_id: str) -> list[Event]:
rows = await self.db.fetch("SELECT * FROM events WHERE aggregate_id = %s ORDER BY timestamp", aggregate_id)
return [Event(row["event_type"], row["data"], row["timestamp"]) for row in rows]
# Восстановление состояния из событий
async def restore_order_state(order_id: str, event_store: EventStore):
events = await event_store.get_events(order_id)
order = Order()
for event in events:
if event.event_type == "OrderCreated":
order.id = order_id
order.total = event.data["total"]
elif event.event_type == "PaymentProcessed":
order.status = "paid"
elif event.event_type == "OrderShipped":
order.status = "shipped"
return order
5. CQRS (Command Query Responsibility Segregation)
Разделение команд (записи) и запросов (чтения):
# commands.py
class CreateOrderCommand:
def __init__(self, user_id: str, items: list, total: float):
self.user_id = user_id
self.items = items
self.total = total
class CreateOrderCommandHandler:
def __init__(self, event_bus):
self.event_bus = event_bus
async def handle(self, command: CreateOrderCommand) -> str:
order_id = uuid.uuid4()
event = OrderCreatedEvent(order_id, command.user_id, command.items, command.total)
await self.event_bus.publish(event)
return order_id
# queries.py
class GetOrderQuery:
def __init__(self, order_id: str):
self.order_id = order_id
class GetOrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetOrderQuery) -> dict:
# Читаем из оптимизированной БД для чтения
return await self.read_db.fetch_one("SELECT * FROM orders_view WHERE id = %s", query.order_id)
6. Circuit Breaker паттерн
Защита от каскадных отказов:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # нормально
OPEN = "open" # отказ
HALF_OPEN = "half_open" # тестируем
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
7. Strangler Fig паттерн
Постепенная миграция монолита на микросервисы. API Gateway перенаправляет часть запросов на новый сервис, остальное на старую систему:
@app.get("/api/v1/products/{product_id}")
async def get_product(product_id: str):
# Новый микросервис
if should_use_new_service(product_id):
return await new_product_service.get(product_id)
# Старая система
return await legacy_system.get_product(product_id)
def should_use_new_service(product_id: str) -> bool:
# Постепенно увеличиваем процент трафика
hash_value = int(hashlib.md5(product_id.encode()).hexdigest(), 16)
return (hash_value % 100) < current_migration_percentage
Выводы
Выбор паттерна зависит от:
- Масштаба системы — количества сервисов
- Требований к надёжности — нужны ли компенсирующие транзакции
- Сложности — насколько критична простота разработки
- Операционных возможностей — есть ли опыт с Kubernetes, Service Mesh
В реальных проектах часто комбинируют несколько паттернов одновременно.