← Назад к вопросам

Есть три сервиса с тремя разными базами данных — как ты будешь забирать данные из всех сервисов

2.8 Senior🔥 61 комментариев
#REST API и HTTP#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Интеграция данных из нескольких микросервисов

Это классическая задача микросервисной архитектуры. Есть несколько проверенных подходов в зависимости от требований — выбирая между консистентностью, производительностью и сложностью.

Подход 1: REST/GraphQL API (рекомендуется для большинства случаев)

Каждый сервис предоставляет публичный API, и клиент (или API Gateway) агрегирует данные.

import httpx
import asyncio
from typing import List

class DataAggregator:
    def __init__(self):
        self.services = {
            'users_service': 'http://users-api:8001',
            'orders_service': 'http://orders-api:8002',
            'payments_service': 'http://payments-api:8003'
        }
    
    async def get_all_data(self, user_id: int):
        # Параллельные запросы к сервисам
        async with httpx.AsyncClient() as client:
            tasks = [
                client.get(f"{self.services['users_service']}/users/{user_id}"),
                client.get(f"{self.services['orders_service']}/orders?user_id={user_id}"),
                client.get(f"{self.services['payments_service']}/payments?user_id={user_id}")
            ]
            
            responses = await asyncio.gather(*tasks)
            
            return {
                'user': responses[0].json(),
                'orders': responses[1].json(),
                'payments': responses[2].json()
            }

Преимущества:

  • Слабая связанность между сервисами
  • Каждый сервис независим
  • Легко масштабировать

Недостатки:

  • Сетевой overhead (высокая латентность)
  • Если один сервис упал — ошибка во всём запросе
  • N+1 queries при большом количестве объектов

Подход 2: Message Queue / Event Bus (для асинхронной синхронизации)

Данные синхронизируются через сообщения/события, каждый сервис кэширует нужные ему данные.

from celery import Celery
import json

celery_app = Celery('tasks', broker='redis://localhost')

# Сервис 1: Users Service
@celery_app.task
def user_created(user_id: int, user_data: dict):
    """Event: пользователь создан"""
    # Публикуем событие
    celery_app.send_task(
        'events.user.created',
        args=(user_id, user_data)
    )

# Сервис 2: Orders Service (подписан на события Users)
@celery_app.task
def handle_user_created(user_id: int, user_data: dict):
    """Слушаем событие от Users Service"""
    # Кэшируем данные о пользователе локально
    local_user_cache[user_id] = user_data

# Сервис 3: Payments Service (подписан на события Orders)
@celery_app.task
def handle_order_created(order_id: int, order_data: dict):
    """Слушаем событие от Orders Service"""
    # Кэшируем данные о заказе локально
    local_order_cache[order_id] = order_data

Преимущества:

  • Асинхронность — нет блокирующих запросов
  • Хорошая производительность (кэширование)
  • Сервисы слабо связаны

Недостатки:

  • Консистентность данных (eventual consistency)
  • Сложность отладки (асинхронные события)
  • Нужна дополнительная инфраструктура (RabbitMQ, Kafka)

Подход 3: Data Warehouse / ETL (для аналитики и отчётов)

Данные из всех сервисов периодически выгружаются в единое хранилище для анализа.

import asyncio
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base
from datetime import datetime

# Единое хранилище для аналитики
warehouse_engine = create_engine('postgresql://warehouse_user@warehouse_db')
Base = declarative_base()

class UserAnalytics(Base):
    __tablename__ = 'user_analytics'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer)
    order_count = Column(Integer)
    total_spent = Column(Integer)
    last_order_date = Column(DateTime)

# ETL процесс — выполняется по расписанию
async def sync_all_services():
    """Синхронизируем данные из всех сервисов в warehouse"""
    
    async with httpx.AsyncClient() as client:
        # Получаем данные с каждого сервиса
        users = await (await client.get('http://users-api:8001/users')).json()
        orders = await (await client.get('http://orders-api:8002/orders')).json()
        payments = await (await client.get('http://payments-api:8003/payments')).json()
    
    # Трансформируем в единый формат
    with warehouse_engine.begin() as connection:
        for user in users:
            user_orders = [o for o in orders if o['user_id'] == user['id']]
            user_payments = [p for p in payments if p['user_id'] == user['id']]
            
            analytics = UserAnalytics(
                user_id=user['id'],
                order_count=len(user_orders),
                total_spent=sum(p['amount'] for p in user_payments),
                last_order_date=max((o['date'] for o in user_orders), default=None)
            )
            connection.execute(analytics)

# Запускаем по расписанию (например, раз в час)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_all_services, 'interval', hours=1)

Преимущества:

  • Данные всегда в одном месте (консистентность)
  • Быстрые запросы без сетевого overhead
  • Идеально для отчётов и аналитики

Недостатки:

  • Данные не real-time (отстают на часы)
  • Высокое потребление памяти (дублирование)
  • Сложность синхронизации

Подход 4: RPC / Direct Database Access (анти-паттерн, но иногда используется)

Директ доступ к БД других сервисов (НЕ рекомендуется!).

# ❌ Плохо — нарушает инкапсуляцию
from sqlalchemy import create_engine

users_engine = create_engine('postgresql://users_service_db')
orders_engine = create_engine('postgresql://orders_service_db')

# Читаем прямо из чужой БД
with users_engine.connect() as conn:
    users = conn.execute('SELECT * FROM users')

Проблемы:

  • Сильная связанность (tight coupling)
  • Сервис 1 зависит от схемы БД сервиса 2
  • Сложно масштабировать и мигрировать
  • Проблемы с безопасностью

Гибридный подход (рекомендуется для production)

class HybridDataAggregator:
    def __init__(self):
        self.cache = {}  # Redis или другое хранилище
        self.services = {...}
    
    async def get_user_profile(self, user_id: int):
        # 1. Проверяем кэш
        cache_key = f"user:{user_id}"
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # 2. Если нет — запрашиваем у сервисов
        async with httpx.AsyncClient() as client:
            user = await (await client.get(
                f"{self.services['users']}/users/{user_id}"
            )).json()
            
            # 3. Кэшируем результат (TTL 5 минут)
            self.cache.set(cache_key, user, ex=300)
            
            return user
    
    async def get_user_with_orders(self, user_id: int):
        # Параллельные запросы с кэшированием
        user_task = asyncio.create_task(self.get_user_profile(user_id))
        orders_task = asyncio.create_task(
            self._get_orders_cached(user_id)
        )
        
        user, orders = await asyncio.gather(user_task, orders_task)
        return {**user, 'orders': orders}

Как выбрать подход?

СценарийПодходПричина
Real-time данные, низкая латентностьREST API + кэшБыстро, просто
Асинхронные события, eventual consistencyMessage QueueМасштабируемо
Отчёты, аналитика, не real-timeData WarehouseДёшево и быстро
Временные данные, редкие запросыREST APIМинималистично

Итог

В 90% случаев я бы использовал REST API + кэш (Redis), потому что: ✓ Простая архитектура
✓ Легко отладить
✓ Хороший баланс между скоростью и консистентностью
✓ Масштабируется с кэшированием
✓ Не требует сложной инфраструктуры