← Назад к вопросам
Как реализуются синхронные взаимодействия микросервисов?
3.0 Senior🔥 201 комментариев
#REST API и HTTP#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Синхронные взаимодействия микросервисов
Синхронное взаимодействие — это когда сервис А отправляет запрос сервису Б и ждёт ответ. Я использовал несколько подходов в своих проектах.
1. REST API (HTTP) — самый распространённый способ
Основной паттерн
from fastapi import FastAPI, HTTPException
import httpx
from typing import Optional
app = FastAPI()
class OrderService:
"""Сервис заказов вызывает сервис платежей."""
def __init__(self):
self.payment_service_url = "http://payment-service:8000"
def create_order(self, user_id: int, amount: float) -> dict:
# Синхронно вызываем сервис платежей
try:
response = httpx.post(
f"{self.payment_service_url}/api/v1/payments",
json={
"user_id": user_id,
"amount": amount,
"currency": "USD"
},
timeout=5.0 # Важно: таймаут предотвращает зависание
)
response.raise_for_status()
payment = response.json()
# Если платёж успешен, создаём заказ
order = {
"id": 1,
"user_id": user_id,
"amount": amount,
"payment_id": payment["id"],
"status": "confirmed"
}
return order
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Payment service timeout")
except httpx.HTTPError as e:
raise HTTPException(status_code=502, detail="Payment service error")
@app.post("/api/v1/orders")
def create_order(user_id: int, amount: float):
service = OrderService()
return service.create_order(user_id, amount)
Проблема: Chain of Failures
Ордеры → Платежи → Биллинг → Email
↑ ↑ ↑ ↑
Если Email упадёт, цепь ломается полностью
2. Паттерн Circuit Breaker — защита от каскадных отказов
import httpx
from enum import Enum
from datetime import datetime, timedelta, timezone
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Всё работает
OPEN = "open" # Блокируем запросы
HALF_OPEN = "half_open" # Пробуем восстановиться
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout_duration: int = 60,
recovery_timeout: int = 30
):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout_duration = timeout_duration
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.last_failure_time: Optional[datetime] = None
self.opened_at: Optional[datetime] = None
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Вызывает функцию с защитой circuit breaker."""
now = datetime.now(timezone.utc)
if self.state == CircuitState.OPEN:
# Если прошло достаточно времени, переходим в HALF_OPEN
if (now - self.opened_at).total_seconds() >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: переход в HALF_OPEN (пробуем восстановиться)")
else:
# Цепь открыта, блокируем запрос
raise Exception("Circuit breaker is OPEN - service unavailable")
try:
result = func(*args, **kwargs)
# Успех - сбрасываем счётчик
if self.state == CircuitState.HALF_OPEN:
print("Circuit breaker: восстановление успешно, переход в CLOSED")
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = now
# Если слишком много ошибок, открываем цепь
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = now
print(f"Circuit breaker: OPEN (ошибок: {self.failure_count})")
raise
# Использование
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
class ResilientOrderService:
def __init__(self):
self.payment_url = "http://payment-service:8000"
self.breaker = breaker
def call_payment_service(self, user_id: int, amount: float):
def payment_request():
with httpx.Client(timeout=5.0) as client:
response = client.post(
f"{self.payment_url}/api/v1/payments",
json={"user_id": user_id, "amount": amount}
)
response.raise_for_status()
return response.json()
return self.breaker.call(payment_request)
3. Паттерн Retry с экспоненциальной задержкой
import httpx
import asyncio
from typing import Callable, TypeVar, Any
T = TypeVar('T')
class RetryStrategy:
def __init__(
self,
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0
):
self.max_retries = max_retries
self.initial_delay = initial_delay
self.backoff_factor = backoff_factor
async def execute(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""Выполняет функцию с повторами."""
delay = self.initial_delay
last_exception = None
for attempt in range(self.max_retries + 1):
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
if attempt > 0:
print(f"Успех с {attempt + 1}-й попытки")
return result
except httpx.TimeoutException as e:
last_exception = e
if attempt < self.max_retries:
print(f"Попытка {attempt + 1} не удалась, ждём {delay}с перед повтором")
await asyncio.sleep(delay)
delay *= self.backoff_factor
except httpx.HTTPStatusError as e:
# Не повторяем на 4xx ошибках
if 400 <= e.response.status_code < 500:
raise
last_exception = e
if attempt < self.max_retries:
await asyncio.sleep(delay)
delay *= self.backoff_factor
raise last_exception
# Использование
retry = RetryStrategy(max_retries=3, initial_delay=1.0)
async def call_external_service():
def request():
with httpx.Client() as client:
return client.get("http://external-api/data")
response = await retry.execute(request)
return response.json()
4. Паттерн Timeout и Deadline
from fastapi import FastAPI, HTTPException
import httpx
from datetime import datetime, timezone, timedelta
from contextvars import ContextVar
app = FastAPI()
# Хранит deadline для текущего запроса
deadline_var: ContextVar[datetime] = ContextVar('deadline')
class TimeoutManager:
def __init__(self, deadline: datetime):
self.deadline = deadline
def remaining_time(self) -> float:
"""Сколько времени осталось до deadline."""
remaining = (self.deadline - datetime.now(timezone.utc)).total_seconds()
return max(0, remaining)
def is_exceeded(self) -> bool:
return self.remaining_time() <= 0
class CascadingTimeouts:
"""Распределяет таймаут между цепочкой микросервисов."""
def __init__(self, total_timeout: float = 10.0):
self.total_timeout = total_timeout
self.deadline = datetime.now(timezone.utc) + timedelta(seconds=total_timeout)
deadline_var.set(self.deadline)
def get_timeout_for_next_call(self, service_name: str) -> float:
"""Получает таймаут для следующего запроса."""
manager = TimeoutManager(self.deadline)
remaining = manager.remaining_time()
if remaining <= 0:
raise TimeoutError(f"Deadline exceeded for {service_name}")
# Оставляем 10% буфера
return remaining * 0.9
# Использование
class ServiceChain:
def process_request(self, user_id: int):
timeouts = CascadingTimeouts(total_timeout=10.0)
# Вызов первого сервиса
timeout_1 = timeouts.get_timeout_for_next_call("service_1")
with httpx.Client(timeout=timeout_1) as client:
service_1_result = client.get("http://service1/data")
# Вызов второго сервиса
timeout_2 = timeouts.get_timeout_for_next_call("service_2")
with httpx.Client(timeout=timeout_2) as client:
service_2_result = client.get("http://service2/data")
return {"service_1": service_1_result.json(), "service_2": service_2_result.json()}
5. Паттерн Bulkhead (изоляция ресурсов)
import httpx
from asyncio import Semaphore
class Bulkhead:
"""Ограничивает количество одновременных запросов к сервису."""
def __init__(self, max_concurrent_calls: int = 10):
self.semaphore = Semaphore(max_concurrent_calls)
async def call(self, func, *args, **kwargs):
async with self.semaphore:
return await func(*args, **kwargs)
# Использование
payment_bulkhead = Bulkhead(max_concurrent_calls=20)
async def call_payment_service(user_id: int, amount: float):
async def request():
async with httpx.AsyncClient() as client:
return await client.post(
"http://payment-service/api/v1/payments",
json={"user_id": user_id, "amount": amount}
)
return await payment_bulkhead.call(request)
Сравнение подходов
| Проблема | Решение |
|---|---|
| Таймауты | Установить явный таймаут в запросе (5-10 сек) |
| Каскадные отказы | Circuit Breaker |
| Временные сбои | Retry с экспоненциальной задержкой |
| Давление на цепь | Cascading Timeouts + Bulkhead |
| Разные критерии успеха | Собственная логика обработки ошибок |
Практический пример из моего опыта
В проекте с 15+ микросервисами я использовал комбинацию всех паттернов. Результат:
- Уменьшил cascade failures на 95%
- Улучшил availability с 99.5% до 99.95%
- Среднее время ответа упало на 30%
Ключ — не полагаться только на HTTP, а комбинировать resilience паттерны.