← Назад к вопросам

Какая специфика работы с БД у микросервисной архитектуры?

2.8 Senior🔥 141 комментариев
#Архитектура и паттерны#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Специфика работы с БД в микросервисной архитектуре

Микросервисная архитектура кардинально меняет подход к работе с БД. Вместо одной большой БД (monolith), у каждого сервиса своя БД. Это создаёт новые вызовы и требует переосмысления стратегии хранения данных.

1. Database per Service — основной принцип

Каждый микросервис имеет свою собственную БД, которую может менять независимо.

# Архитектура:
# User Service -> User DB (PostgreSQL)
# Order Service -> Order DB (PostgreSQL)
# Payment Service -> Payment DB (MongoDB)
# Inventory Service -> Inventory DB (Redis)

# Пример сервиса пользователей
# services/user_service/db.py
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, Session

engine = create_engine('postgresql://user:pass@user-db:5432/users')
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String, unique=True)

Base.metadata.create_all(engine)

# Сервис заказов имеет совершенно другую БД
# services/order_service/db.py
class Order(Base):  # Разная Base!
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)  # НЕ внешний ключ!
    total = Column(Float)

Преимущества:

  • Независимое развитие и деплой сервисов
  • Можно использовать разные БД (PostgreSQL, MongoDB, Redis)
  • Нет блокировок между сервисами
  • Масштабируется легче

Проблемы:

  • Нет транзакций между сервисами
  • Сложнее обеспечить консистентность
  • Нет внешних ключей между БД

2. Distributed Transactions — основная боль

Раньше была одна БД и одна транзакция. Теперь нужно синхронизировать несколько БД.

Проблема: Two-Phase Commit (2PC) медленный

# Плохой подход: Two-Phase Commit
# Фаза 1: Prepare — все сервисы говорят готовы ли они
# Фаза 2: Commit — все коммитят или откатываются

# User Service пытается вычесть деньги
# Order Service пытается создать заказ
# Payment Service пытается обработать платёж

# Если Payment Service отвечает 5 секунд, ВСЕ ЖДУТ 5 секунд!
# Это убивает масштабируемость

from sqlalchemy.orm import Session

def create_order_2pc(user_id: int, amount: float) -> bool:
    # Попробуй зарезервировать средства (prepare)
    payment_reserved = payment_service.reserve_funds(user_id, amount)
    if not payment_reserved:
        return False
    
    try:
        # Создай заказ
        order = Order(user_id=user_id, amount=amount)
        db_session.add(order)
        db_session.commit()  # Commit в Order DB
        
        # Commit в Payment Service
        payment_service.confirm_payment(user_id, amount)
        return True
    except Exception as e:
        # Откат везде
        db_session.rollback()
        payment_service.cancel_reserve(user_id, amount)
        raise

Проблемы 2PC:

  • Очень медленно (network latency)
  • Если сервис упадёт в фазе 2, остаются зависшие транзакции
  • Не масштабируется при high-load

Решение: Saga Pattern

# Лучший подход: Saga (долгоживущая транзакция)
# Вместо одной транзакции, серия локальных транзакций с компенсациями

from dataclasses import dataclass
from enum import Enum
from typing import Callable

class SagaStatus(Enum):
    STARTED = "started"
    PAYMENT_RESERVED = "payment_reserved"
    ORDER_CREATED = "order_created"
    PAYMENT_CONFIRMED = "payment_confirmed"
    FAILED = "failed"

@dataclass
class OrderSaga:
    user_id: int
    amount: float
    status: SagaStatus = SagaStatus.STARTED

class OrderSagaOrchestrator:
    def __init__(self, payment_service, order_db, event_bus):
        self.payment_service = payment_service
        self.order_db = order_db
        self.event_bus = event_bus
    
    async def execute_saga(self, user_id: int, amount: float) -> bool:
        saga = OrderSaga(user_id=user_id, amount=amount)
        
        try:
            # Шаг 1: Зарезервировать платёж (локальная транзакция)
            payment_reserved = await self.payment_service.reserve_funds(user_id, amount)
            saga.status = SagaStatus.PAYMENT_RESERVED
            
            if not payment_reserved:
                raise Exception("Payment reserve failed")
            
            # Шаг 2: Создать заказ (локальная транзакция)
            order = Order(user_id=user_id, amount=amount)
            self.order_db.add(order)
            self.order_db.commit()  # O(1) - только локальная БД
            saga.status = SagaStatus.ORDER_CREATED
            
            # Шаг 3: Подтвердить платёж (локальная транзакция)
            await self.payment_service.confirm_payment(user_id, amount)
            saga.status = SagaStatus.PAYMENT_CONFIRMED
            
            # Publish event: Order Created
            self.event_bus.publish('order.created', {'order_id': order.id})
            
            return True
        
        except Exception as e:
            saga.status = SagaStatus.FAILED
            # Компенсирующие транзакции (откаты)
            await self._compensate(saga)
            return False
    
    async def _compensate(self, saga: OrderSaga):
        """Откатить изменения"""
        # Откат в обратном порядке
        if saga.status == SagaStatus.ORDER_CREATED:
            # Удали заказ
            self.order_db.query(Order).filter_by(...).delete()
            self.order_db.commit()
        
        if saga.status in [SagaStatus.PAYMENT_RESERVED, SagaStatus.ORDER_CREATED]:
            # Отмени резервирование
            await self.payment_service.cancel_reserve(saga.user_id, saga.amount)

Преимущества Saga:

  • Каждый шаг — локальная транзакция (быстро)
  • Можно откатить через компенсирующие транзакции
  • Асинхронно, не блокирует
  • Масштабируется

3. Event Sourcing & CQRS

Вместо хранения текущего состояния, храни последовательность событий.

# Event Sourcing
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List

class OrderEvent(Enum):
    CREATED = "created"
    PAYMENT_RESERVED = "payment_reserved"
    SHIPPED = "shipped"
    CANCELLED = "cancelled"

@dataclass
class OrderEventRecord:
    event_type: OrderEvent
    aggregate_id: str  # Order ID
    timestamp: datetime
    data: dict
    version: int

class OrderEventStore:
    def __init__(self, db):
        self.db = db
    
    def append_event(self, aggregate_id: str, event: OrderEventRecord):
        """Добавить событие в event log"""
        event_record = {
            'aggregate_id': aggregate_id,
            'event_type': event.event_type.value,
            'timestamp': event.timestamp,
            'data': event.data,
            'version': event.version
        }
        self.db.execute('''
            INSERT INTO event_log (aggregate_id, event_type, timestamp, data, version)
            VALUES (%s, %s, %s, %s, %s)
        ''', event_record.values())
    
    def get_events(self, aggregate_id: str) -> List[OrderEventRecord]:
        """Получить все события для заказа"""
        events = self.db.query('''
            SELECT * FROM event_log WHERE aggregate_id = %s ORDER BY version
        ''', (aggregate_id,))
        return [OrderEventRecord(**event) for event in events]
    
    def rebuild_state(self, aggregate_id: str) -> dict:
        """Восстановить текущее состояние из событий"""
        events = self.get_events(aggregate_id)
        state = {'status': 'draft', 'amount': 0}
        
        for event in events:
            if event.event_type == OrderEvent.CREATED:
                state['status'] = 'created'
                state['amount'] = event.data['amount']
            elif event.event_type == OrderEvent.PAYMENT_RESERVED:
                state['status'] = 'payment_reserved'
            elif event.event_type == OrderEvent.SHIPPED:
                state['status'] = 'shipped'
            elif event.event_type == OrderEvent.CANCELLED:
                state['status'] = 'cancelled'
        
        return state

# CQRS: Command Query Responsibility Segregation
# Отдельные модели для чтения и записи

class OrderWriteModel:  # Для команд (INSERT/UPDATE)
    def create_order(self, user_id: int, amount: float):
        order = Order(user_id=user_id, amount=amount)
        event_store.append_event(order.id, OrderEventRecord(
            event_type=OrderEvent.CREATED,
            aggregate_id=order.id,
            timestamp=datetime.now(),
            data={'amount': amount}
        ))

class OrderReadModel:  # Для запросов (SELECT) — кэшированные данные
    def get_order(self, order_id: str) -> dict:
        # Читаем из кэша/денормализованной БД
        return cache.get(f"order:{order_id}")  # O(1) вместо rebuild

Плюсы Event Sourcing:

  • Полная история всех изменений
  • Можно восстановить любое состояние
  • Отлично для аудита
  • Асинхронная репликация

Минусы:

  • Сложнее в разработке
  • Требует больше памяти (весь event log)

4. Data Consistency Patterns

Eventual Consistency (в конце концов будет консистентно)

# Order Service создал заказ
order = Order(user_id=user_id, amount=amount)
order_db.add(order)
order_db.commit()

# Async публикует событие
event_bus.publish('order.created', {'order_id': order.id})

# Inventory Service слушает событие
@event_bus.subscribe('order.created')
async def on_order_created(event):
    # ПОЗЖЕ обновляет свою БД
    await inventory_db.update_stock(event['order_id'])

# В течение 100мс все сервисы будут консистентны
# Но в течение этих 100мс данные могут быть несогласованы

Strong Consistency (немедленная консистентность)

# Используй distributed lock
from redis import Redis

redis = Redis()

def create_order_with_lock(user_id: int, amount: float):
    lock_key = f"user:{user_id}:order_lock"
    
    # Заблокируй пользователя (только один заказ одновременно)
    with redis.lock(lock_key, timeout=5):
        # Проверь баланс
        balance = payment_service.get_balance(user_id)
        if balance < amount:
            raise Exception("Insufficient balance")
        
        # Создай заказ (под локом)
        order = Order(user_id=user_id, amount=amount)
        order_db.add(order)
        order_db.commit()
        
        # Вычти средства
        payment_service.deduct_funds(user_id, amount)
        
        return order  # Гарантированно консистентно

5. Кросс-сервисные запросы (Anti-Pattern)

Неправильно: Прямые запросы в чужую БД

# Order Service НЕ ДОЛЖЕН делать так:
from services.user_service import UserDB

user_db = UserDB()  # Прямое подключение!
user = user_db.query(User).filter_by(id=user_id).first()  # Плохо!

# Проблемы:
# 1. Tight coupling с User Service
# 2. Если User DB упала, Order Service упадёт
# 3. Нарушает инкапсуляцию

Правильно: API вызовы

# Order Service вызывает User Service API
import httpx

async def create_order(user_id: int, amount: float):
    # Вызов User Service API
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"http://user-service:8000/users/{user_id}",
            timeout=5.0  # Timeout!
        )
        
        if response.status_code != 200:
            raise Exception("User service unavailable")
        
        user = response.json()
        
        # Используй данные
        order = Order(user_id=user.id, amount=amount)
        order_db.add(order)
        order_db.commit()
        
        return order

6. Кэширование между сервисами

# Используй Redis для кэша данных других сервисов
from redis import Redis
import json

redis = Redis(host='redis')

def get_user_with_cache(user_id: int):
    cache_key = f"user:{user_id}"
    
    # Попробуй кэш
    cached = redis.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Нет в кэше — вызови API
    user = user_service_client.get_user(user_id)
    
    # Сохрани в кэш на 5 минут
    redis.setex(cache_key, 300, json.dumps(user))
    
    return user

Таблица сравнения подходов

ПодходКонсистентностьСкоростьСложностьЛучше для
Saga PatternEventuallyВысокаяСредняяАсинхронные операции
Event SourcingStrongВысокаяВысокаяАудит, история
API CallsEventuallyСредняяНизкаяПростые операции
Distributed LocksStrongНизкаяВысокаяКритичные операции
Eventual ConsistencyWeakОчень высокаяСредняяReal-time системы

Best Practices для микросервисов

  1. Каждый сервис — своя БД, не делись таблицами
  2. Используй Saga Pattern для операций между сервисами
  3. Публикуй события вместо синхронных запросов
  4. Кэшируй данные других сервисов в Redis
  5. Добавь timeouts на все внешние вызовы
  6. Мониторь консистентность между сервисами
  7. Логируй распределённый трейс для отладки
  8. Используй Circuit Breaker для отказоустойчивости

Микросервисная БД — это компромисс между независимостью и консистентностью. Выбирай подход в зависимости от требований.

Какая специфика работы с БД у микросервисной архитектуры? | PrepBro