← Назад к вопросам

Какие знаешь паттерны построения микросервисов?

2.4 Senior🔥 191 комментариев
#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Паттерны построения микросервисов

Микросервисная архитектура — это один из самых важных паттернов в современной разработке. За мой опыт я работал с несколькими ключевыми подходами.

1. Паттерн "Sagа"

Для управления распределёнными транзакциями между микросервисами используется паттерн Saga. Есть две реализации:

Choreography — каждый сервис слушает события и сам решает, что делать:

# Сервис заказов публикует событие
order_created_event = OrderCreatedEvent(order_id=123, user_id=456)
event_bus.publish("order.created", order_created_event)

# Сервис платежей слушает и обрабатывает
@event_bus.on("order.created")
async def process_payment(event):
    await payment_service.charge(event.user_id, event.amount)
    event_bus.publish("payment.processed", {"order_id": event.order_id})

Orchestration — центральный оркестратор управляет всеми шагами:

class OrderSaga:
    def __init__(self, services):
        self.order_service = services["order"]
        self.payment_service = services["payment"]
        self.inventory_service = services["inventory"]
    
    async def execute(self, order):
        try:
            order_id = await self.order_service.create(order)
            await self.payment_service.charge(order.user_id, order.amount)
            await self.inventory_service.reserve(order.items)
            return order_id
        except Exception:
            # компенсирующие транзакции
            await self.payment_service.refund(order.user_id, order.amount)
            await self.inventory_service.release(order.items)

2. API Gateway паттерн

Единая точка входа для всех клиентов:

# gateway.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

SERVICES = {
    "users": "http://users-service:8001",
    "orders": "http://orders-service:8002",
    "products": "http://products-service:8003",
}

@app.get("/api/v1/users/{user_id}")
async def get_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{SERVICES[users]}/users/{user_id}")
        return response.json()

@app.post("/api/v1/orders")
async def create_order(order_data: dict):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICES[orders]}/orders", json=order_data)
        return response.json()

3. Service Mesh паттерн

Используется Istio или Linkerd для управления сетевым трафиком между сервисами. Добавляется прозрачно без изменений в коде приложения:

# istio-config.yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: orders
spec:
  hosts:
  - orders
  http:
  - match:
    - uri:
        prefix: /api/v1
    route:
    - destination:
        host: orders
        port:
          number: 8000
    timeout: 10s
    retries:
      attempts: 3
      perTryTimeout: 2s

4. Event Sourcing

Все изменения состояния хранятся как последовательность событий:

class Event:
    def __init__(self, event_type: str, data: dict, timestamp: datetime):
        self.event_type = event_type
        self.data = data
        self.timestamp = timestamp

class EventStore:
    def __init__(self, db_connection):
        self.db = db_connection
    
    async def append(self, aggregate_id: str, event: Event):
        await self.db.execute(
            "INSERT INTO events (aggregate_id, event_type, data, timestamp) VALUES (%s, %s, %s, %s)",
            (aggregate_id, event.event_type, json.dumps(event.data), event.timestamp)
        )
    
    async def get_events(self, aggregate_id: str) -> list[Event]:
        rows = await self.db.fetch("SELECT * FROM events WHERE aggregate_id = %s ORDER BY timestamp", aggregate_id)
        return [Event(row["event_type"], row["data"], row["timestamp"]) for row in rows]

# Восстановление состояния из событий
async def restore_order_state(order_id: str, event_store: EventStore):
    events = await event_store.get_events(order_id)
    order = Order()
    for event in events:
        if event.event_type == "OrderCreated":
            order.id = order_id
            order.total = event.data["total"]
        elif event.event_type == "PaymentProcessed":
            order.status = "paid"
        elif event.event_type == "OrderShipped":
            order.status = "shipped"
    return order

5. CQRS (Command Query Responsibility Segregation)

Разделение команд (записи) и запросов (чтения):

# commands.py
class CreateOrderCommand:
    def __init__(self, user_id: str, items: list, total: float):
        self.user_id = user_id
        self.items = items
        self.total = total

class CreateOrderCommandHandler:
    def __init__(self, event_bus):
        self.event_bus = event_bus
    
    async def handle(self, command: CreateOrderCommand) -> str:
        order_id = uuid.uuid4()
        event = OrderCreatedEvent(order_id, command.user_id, command.items, command.total)
        await self.event_bus.publish(event)
        return order_id

# queries.py
class GetOrderQuery:
    def __init__(self, order_id: str):
        self.order_id = order_id

class GetOrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db
    
    async def handle(self, query: GetOrderQuery) -> dict:
        # Читаем из оптимизированной БД для чтения
        return await self.read_db.fetch_one("SELECT * FROM orders_view WHERE id = %s", query.order_id)

6. Circuit Breaker паттерн

Защита от каскадных отказов:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # нормально
    OPEN = "open"          # отказ
    HALF_OPEN = "half_open"  # тестируем

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            raise

7. Strangler Fig паттерн

Постепенная миграция монолита на микросервисы. API Gateway перенаправляет часть запросов на новый сервис, остальное на старую систему:

@app.get("/api/v1/products/{product_id}")
async def get_product(product_id: str):
    # Новый микросервис
    if should_use_new_service(product_id):
        return await new_product_service.get(product_id)
    # Старая система
    return await legacy_system.get_product(product_id)

def should_use_new_service(product_id: str) -> bool:
    # Постепенно увеличиваем процент трафика
    hash_value = int(hashlib.md5(product_id.encode()).hexdigest(), 16)
    return (hash_value % 100) < current_migration_percentage

Выводы

Выбор паттерна зависит от:

  • Масштаба системы — количества сервисов
  • Требований к надёжности — нужны ли компенсирующие транзакции
  • Сложности — насколько критична простота разработки
  • Операционных возможностей — есть ли опыт с Kubernetes, Service Mesh

В реальных проектах часто комбинируют несколько паттернов одновременно.

Какие знаешь паттерны построения микросервисов? | PrepBro