← Назад к вопросам

Как решали проблемы мультитенантности?

2.0 Middle🔥 131 комментариев
#Архитектура и проектирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI26 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение проблем мультитенантности (Multi-tenancy)

Мультитенантность — это архитектура, где одно приложение/хранилище обслуживает несколько независимых клиентов (тенантов). Это сложная проблема в data engineering, требующая решения на уровне базы данных, логики приложения и аналитики.

1. Основные подходы к мультитенантности

1.1 Database per tenant (Отдельная БД для каждого тенанта)

Тенант A → Database A (PostgreSQL)
Тенант B → Database B (PostgreSQL)
Тенант C → Database C (PostgreSQL)

Преимущества:

  • Полная изоляция данных
  • Легкая бэкап и восстановление
  • Разные версии схемы для разных тенантов

Недостатки:

  • Сложный operational overhead
  • Много подключений
  • Сложная аналитика (нужна синхронизация)
from sqlalchemy import create_engine

def get_tenant_db(tenant_id):
    """Получить подключение к БД тенанта"""
    db_host = f"db-{tenant_id}.company.com"  # Динамический хост
    connection_string = f"postgresql://user:pass@{db_host}/tenant_db"
    engine = create_engine(connection_string)
    return engine

# Использование
engine_a = get_tenant_db('tenant_a')
engine_b = get_tenant_db('tenant_b')

1.2 Schema per tenant (Отдельная схема в одной БД)

Database (PostgreSQL)
├── Schema: tenant_a (tables, views)
├── Schema: tenant_b (tables, views)
└── Schema: shared (общие таблицы)

Преимущества:

  • Одна БД, меньше overhead
  • Можно быстро миграть данные
  • Проще backup

Недостатки:

  • Нужно переключать search_path
  • Cross-tenant запросы опасны
from sqlalchemy import text

def set_tenant_schema(connection, tenant_id):
    """Установить схему для тенанта"""
    schema_name = f"tenant_{tenant_id}"
    connection.execute(text(f"SET search_path = {schema_name}, public"))

# Middleware для Flask
from flask import request, g

@app.before_request
def set_tenant_context():
    tenant_id = request.headers.get('X-Tenant-ID')
    g.tenant_id = tenant_id
    # Установить схему в connection pool

1.3 Shared database with tenant_id column (Общая БД с колонкой тенанта)

Database
└── Table: users
    ├── id | tenant_id | name | email
    ├── 1  | tenant_a  | Alice | alice@a.com
    ├── 2  | tenant_b  | Bob | bob@b.com
    └── 3  | tenant_a  | Charlie | charlie@a.com

Преимущества:

  • Просто в реализации
  • Легкая аналитика
  • Хорошо масштабируется

Недостатки:

  • Нужно всегда фильтровать по tenant_id
  • Ошибка может привести к утечке данных

2. Row-level security (RLS) в PostgreSQL

Самый безопасный подход для Shared Database:

-- Создаём таблицу с tenant_id
CREATE TABLE orders (
    id UUID PRIMARY KEY,
    tenant_id UUID NOT NULL,
    user_id UUID,
    amount DECIMAL(10, 2),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Создаём policy для RLS
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;

-- Policy: пользователь видит только свои заказы
CREATE POLICY tenant_isolation ON orders
    FOR ALL
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Проверяем policy
SET app.current_tenant_id = 'tenant_a_id';
SELECT * FROM orders;  -- Только заказы tenant_a

SET app.current_tenant_id = 'tenant_b_id';
SELECT * FROM orders;  -- Только заказы tenant_b

3. Изоляция на уровне приложения

from sqlalchemy.orm import Session
from sqlalchemy import and_
from functools import wraps
from flask import request, abort

class TenantMixin:
    """Миксин для автоматической фильтрации по тенанту"""
    tenant_id = Column(UUID, nullable=False, index=True)

class User(Base, TenantMixin):
    __tablename__ = 'users'
    id = Column(UUID, primary_key=True)
    name = Column(String)

class Order(Base, TenantMixin):
    __tablename__ = 'orders'
    id = Column(UUID, primary_key=True)
    user_id = Column(UUID)
    amount = Column(Numeric)

def require_tenant(f):
    """Декоратор для проверки tenant_id в запросе"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        tenant_id = request.headers.get('X-Tenant-ID')
        if not tenant_id:
            abort(400, 'Missing X-Tenant-ID header')
        
        # Сохраняем в контексте
        g.tenant_id = tenant_id
        return f(*args, **kwargs)
    
    return decorated_function

@app.route('/api/orders', methods=['GET'])
@require_tenant
def get_orders():
    """Получить заказы текущего тенанта"""
    db = get_session()
    
    # Автоматически фильтруем по tenant_id
    orders = db.query(Order).filter(
        Order.tenant_id == g.tenant_id
    ).all()
    
    return jsonify([{
        'id': str(o.id),
        'amount': float(o.amount)
    } for o in orders])

@app.route('/api/orders/<order_id>', methods=['GET'])
@require_tenant
def get_order(order_id):
    """Получить конкретный заказ (с проверкой тенанта)"""
    db = get_session()
    
    order = db.query(Order).filter(
        and_(
            Order.id == order_id,
            Order.tenant_id == g.tenant_id  # ВАЖНО: проверяем тенанта!
        )
    ).first()
    
    if not order:
        abort(404, 'Order not found')
    
    return jsonify({
        'id': str(order.id),
        'amount': float(order.amount)
    })

4. Аналитика в мультитенантной системе

Самая сложная часть — агрегация данных по тенантам:

def aggregate_orders_analytics():
    """
    Собрать аналитику заказов по всем тенантам
    для общей dashboard
    """
    
    from_date = datetime.now() - timedelta(days=30)
    
    query = """
    SELECT 
        tenant_id,
        DATE(created_at) as order_date,
        COUNT(*) as order_count,
        SUM(amount) as total_amount,
        AVG(amount) as avg_amount
    FROM orders
    WHERE created_at >= %s
    GROUP BY tenant_id, DATE(created_at)
    ORDER BY tenant_id, order_date DESC
    """
    
    with get_db_connection() as conn:
        results = conn.execute(query, (from_date,))
        
        # Структурируем по тенантам
        analytics = {}
        for row in results:
            tenant_id = row['tenant_id']
            if tenant_id not in analytics:
                analytics[tenant_id] = []
            
            analytics[tenant_id].append({
                'date': row['order_date'].isoformat(),
                'count': row['order_count'],
                'total': float(row['total_amount']),
                'avg': float(row['avg_amount'])
            })
    
    return analytics

# Для большого количества тенантов используем ETL
def etl_tenant_analytics():
    """Многопроцессный ETL для параллельной обработки тенантов"""
    import multiprocessing
    
    # Получим список всех тенантов
    tenants = get_all_tenants()
    
    # Обработаем в параллель
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(process_tenant_analytics, tenants)
    
    # Загрузим в DWH
    for tenant_id, analytics in results:
        save_to_dwh(tenant_id, analytics)

def process_tenant_analytics(tenant_id):
    """Обработать аналитику одного тенанта"""
    query = """
    SELECT 
        DATE(created_at) as order_date,
        COUNT(*) as order_count,
        SUM(amount) as total_amount
    FROM orders
    WHERE tenant_id = %s AND created_at >= NOW() - INTERVAL '30 days'
    GROUP BY DATE(created_at)
    """
    
    with get_db_connection() as conn:
        results = conn.execute(query, (tenant_id,))
        analytics = [dict(row) for row in results]
    
    return tenant_id, analytics

5. Миграции в мультитенантной системе

def apply_migration_to_all_tenants(migration_func):
    """Применить миграцию ко всем тенантам"""
    from concurrent.futures import ThreadPoolExecutor
    
    tenants = get_all_tenants()
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(apply_migration_to_tenant, tenant_id, migration_func)
            for tenant_id in tenants
        ]
        
        for future in futures:
            try:
                future.result(timeout=300)  # 5 минут per tenant
            except Exception as e:
                logger.error(f"Migration failed: {e}")
                raise

def apply_migration_to_tenant(tenant_id, migration_func):
    """Применить миграцию к одному тенанту"""
    engine = get_tenant_db(tenant_id)
    with engine.begin() as connection:
        # Добавляем колонку
        connection.execute(text(
            "ALTER TABLE orders ADD COLUMN currency VARCHAR(3) DEFAULT 'USD'"
        ))
        # Вызываем бизнес-логику
        migration_func(connection, tenant_id)
        logger.info(f"Migration applied for tenant {tenant_id}")

6. Борьба с утечками данных

# Плохо: утечка данных
@app.route('/api/users')
def get_users():
    db = get_session()
    users = db.query(User).all()  # БЕЗ фильтрации!
    return jsonify(...)  # Вернёт пользователей всех тенантов!

# Хорошо: с защитой
@app.route('/api/users')
@require_tenant
def get_users():
    db = get_session()
    users = db.query(User).filter(
        User.tenant_id == g.tenant_id  # ВСЕГДА фильтруем
    ).all()
    return jsonify(...)

# Best practice: автоматическое добавление фильтра
class SecureSession(Session):
    """Session с автоматической фильтрацией по тенанту"""
    
    def query(self, *args, **kwargs):
        query = super().query(*args, **kwargs)
        
        # Автоматически добавляем фильтр для TenantMixin
        if args and hasattr(args[0], 'tenant_id'):
            return query.filter(
                args[0].tenant_id == get_current_tenant_id()
            )
        
        return query

7. Практический пример: полная архитектура

# models.py
class TenantMixin:
    tenant_id = Column(UUID, nullable=False, index=True)

class User(Base, TenantMixin):
    __tablename__ = 'users'
    id = Column(UUID, primary_key=True)
    email = Column(String, unique=True)
    name = Column(String)

class Order(Base, TenantMixin):
    __tablename__ = 'orders'
    id = Column(UUID, primary_key=True)
    user_id = Column(UUID, ForeignKey('users.id'))
    amount = Column(Numeric(10, 2))
    created_at = Column(DateTime(timezone=True), default=datetime.utcnow)

# middleware.py
from flask import request, g
import uuid

@app.before_request
def set_tenant_context():
    tenant_id = request.headers.get('X-Tenant-ID')
    
    if not tenant_id:
        abort(400, 'X-Tenant-ID header is required')
    
    try:
        uuid.UUID(tenant_id)  # Проверка формата
    except ValueError:
        abort(400, 'Invalid X-Tenant-ID format')
    
    g.tenant_id = uuid.UUID(tenant_id)

# routes.py
@app.route('/api/orders', methods=['POST'])
def create_order():
    data = request.get_json()
    
    order = Order(
        id=uuid.uuid4(),
        tenant_id=g.tenant_id,  # Автоматически добавляем
        user_id=uuid.UUID(data['user_id']),
        amount=Decimal(data['amount'])
    )
    
    db.add(order)
    db.commit()
    
    return jsonify({'id': str(order.id)}), 201

Итоговый чеклист для мультитенантности

  1. Выбрать подход: Database per tenant / Schema per tenant / Shared DB with tenant_id
  2. Реализовать изоляцию: Middleware для проверки tenant_id
  3. Защитить от утечек: Row-level security или фильтры в коде
  4. Аналитика: Агрегация по тенантам в отдельной БД
  5. Миграции: Применять ко всем тенантам параллельно
  6. Мониторинг: Логировать доступ cross-tenant запросы