Как разделять задачи между сервисами в микросервисной архитектуре?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Разделение задач между сервисами в микросервисной архитектуре
Делить функционал на сервисы нужно тщательно — неправильное разделение создаёт распределённый монолит. Расскажу о стратегиях и принципах.
Основной принцип: Domain-Driven Design
Делай разделение по бизнес-доменам, а не по техническим слоям.
# ❌ НЕПРАВИЛЬНО — по техническим слоям
auth_service/ # Только аутентификация
data_service/ # Только БД
api_gateway/ # Только API
# ✅ ПРАВИЛЬНО — по бизнес-доменам
user_service/ # Управление пользователями
order_service/ # Управление заказами
payment_service/ # Обработка платежей
notification_service/ # Отправка уведомлений
Критерии разделения (SOLID для микросервисов)
1. Single Responsibility Каждый сервис отвечает за одну область:
# user_service: управление профилями
@app.post("/users")
async def create_user(user: UserCreate) -> User:
user_obj = User(**user.dict())
db.add(user_obj)
await db.commit()
return user_obj
# auth_service: аутентификация
@app.post("/login")
async def login(email: str, password: str) -> Token:
user = await get_user_by_email(email)
if not verify_password(password, user.password_hash):
raise HTTPException(status_code=401)
return generate_token(user.id)
# ❌ ПЛОХО — смешивание ответственности
@app.post("/users/with_auth")
async def create_and_authenticate(user: UserCreate):
# Создание пользователя
user_obj = User(**user.dict())
# Генерирование токена
token = generate_token(user_obj.id)
# Отправка email
send_welcome_email(user_obj.email)
# Всё смешано!
2. Decoupling (Слабая связанность) Сервисы не должны зависеть от деталей реализации друг друга:
# ✅ Правильно — через message queue
# order_service публикует событие
if order_created:
message_queue.publish("order.created", {
"order_id": order.id,
"user_id": order.user_id,
"total_amount": order.total
})
# payment_service слушает событие
@message_queue.on("order.created")
async def handle_order_created(event):
charge_user(event["user_id"], event["total_amount"])
# ❌ Плохо — прямой вызов
order_service.create_order()
payment_service.charge_card() # Жёсткая связанность
3. Scalability (Масштабируемость) Делай границы так, чтобы отдельные сервисы масштабировались независимо:
# payment_service может получить большую нагрузку
# Масштабируем только его, без изменения других сервисов
kubernetes:
deployment:
payment_service:
replicas: 10 # 10 инстансов
user_service:
replicas: 2 # 2 инстанса
Паттерны разделения
Паттерн 1: Data Ownership Каждый сервис владеет своей БД, другие не имеют доступа:
# user_service
user_db = PostgresDB("postgresql://localhost/users")
@app.get("/users/{user_id}")
async def get_user(user_id: str):
user = await user_db.query(f"SELECT * FROM users WHERE id={user_id}")
return user
# order_service не может напрямую обращаться к user_db!
# Должен вызвать user_service API
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
order = await order_db.query(f"SELECT * FROM orders WHERE id={order_id}")
# Получить данные пользователя через API
user = await http_client.get(f"http://user_service/users/{order.user_id}")
return {"order": order, "user": user}
Паттерн 2: Saga Pattern (для распределённых транзакций)
# order_service инициирует сагу
@app.post("/orders")
async def create_order(order: OrderCreate):
order_obj = Order(**order.dict())
await order_db.add(order_obj)
# Шаг 1: Зарезервировать инвентарь
try:
await inventory_service.reserve(order.items)
except Exception:
await order_db.delete(order_obj) # Компенсирующая транзакция
raise
# Шаг 2: Обработать платёж
try:
await payment_service.charge(order.total)
except Exception:
await inventory_service.release(order.items) # Откат
await order_db.delete(order_obj)
raise
# Шаг 3: Отправить уведомление
await notification_service.send(f"Order {order.id} created")
return order_obj
Паттерн 3: API Gateway (единая точка входа)
# api_gateway (основной сервис)
@app.get("/api/v1/users/{user_id}")
async def get_user_profile(user_id: str):
# Собираем данные из разных сервисов
user = await user_service.get_user(user_id)
orders = await order_service.get_user_orders(user_id)
payments = await payment_service.get_user_payments(user_id)
return {
"user": user,
"orders": orders,
"payment_history": payments
}
Коммуникация между сервисами
Синхронная (Request/Response)
import httpx
class OrderService:
def __init__(self):
self.payment_service_url = "http://payment_service:8000"
async def create_order(self, user_id: str, total: float):
# Синхронный вызов — блокирует до ответа
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.payment_service_url}/charge",
json={"user_id": user_id, "amount": total}
)
payment = response.json()
if payment["status"] != "success":
raise PaymentError()
return Order(user_id=user_id, payment_id=payment["id"])
Асинхронная (Event-Driven)
import asyncio
from rabbitmq import RabbitMQ
queue = RabbitMQ()
# order_service публикует событие
@app.post("/orders")
async def create_order(order: OrderCreate):
order_obj = Order(**order.dict())
await order_db.add(order_obj)
# Не ждём ответа — просто публикуем событие
await queue.publish("order.created", {
"order_id": str(order_obj.id),
"user_id": order_obj.user_id,
"total": order_obj.total
})
return order_obj
# payment_service слушает события
@queue.subscribe("order.created")
async def on_order_created(event):
try:
result = charge_user(event["user_id"], event["total"])
await queue.publish("payment.completed", result)
except Exception as e:
await queue.publish("payment.failed", {"error": str(e)})
Инструменты для коммуникации
# HTTP/REST — для синхронных запросов
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("http://other-service/api/data")
# gRPC — для высокопроизводительных синхронных запросов
# RabbitMQ/Kafka — для асинхронных событий
# Redis PubSub — для простых pub/sub сценариев
Проблемы и решения
Проблема: Distributed Transactions
# Невозможно использовать ACID транзакции на несколько БД
# Решение: Saga pattern или Event sourcing
# Saga: координируемые локальные транзакции
async def create_order_saga(order):
# 1. Создать заказ
await order_db.create(order)
try:
# 2. Зарезервировать товар
await inventory_service.reserve(order.items)
# 3. Взять платёж
await payment_service.charge(order.total)
except Exception:
# Компенсирующие транзакции
await order_db.cancel(order)
await inventory_service.release(order.items)
raise
Проблема: Network Latency
# Микросервисы находятся в сети, запросы медленны
# Решение: Кэширование и асинхронность
from functools import lru_cache
@lru_cache(maxsize=1000)
async def get_user_cached(user_id: str):
return await user_service.get_user(user_id)
# Или используй message queue для асинхронности
await message_queue.publish("user.updated", event)
Best Practices
- Дели по бизнес-доменам, не по техническим слоям
- Каждый сервис владеет своей БД — нет общих данных
- Используй асинхронную коммуникацию когда возможно (message queue)
- Реализуй circuit breaker для отказоустойчивости
- Логируй и трейсируй запросы между сервисами (OpenTelemetry)
- Версионируй API —
/api/v1,/api/v2 - Документируй зависимости — какие сервисы зависят друг от друга
- Монитори latency — проверяй, не слишком ли медленно
Микросервисы — это не серебряная пуля, используй их, когда:- У команды есть несколько небольших автономных групп
- Разные части нужно масштабировать независимо
- Разные части используют разные технологии
- Нужна высокая доступность каждого сервиса