← Назад к вопросам

Каким должно быть приложение, с точки зрения специфики разработки, чтобы оно работало хорошо в нескольких экземплярах на разных серверах?

3.0 Senior🔥 191 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Приложение для распределённых систем

Для корректной работы приложения в нескольких экземплярах на разных серверах требуется учесть множество аспектов архитектуры и разработки.

1. Stateless архитектура

Приложение должно быть полностью stateless — не хранить состояние локально.

# Плохо: stateful приложение
class UserService:
    users_cache = {}  # Локальное состояние
    
    def get_user(self, user_id):
        if user_id in self.users_cache:
            return self.users_cache[user_id]
        
        user = db.query(User).get(user_id)
        self.users_cache[user_id] = user
        return user

# Хорошо: stateless приложение
class UserService:
    def __init__(self, cache_service: CacheService):
        self.cache = cache_service
    
    async def get_user(self, user_id: int) -> User:
        # Состояние в Redis, доступно всем инстансам
        cached = await self.cache.get(f"user:{user_id}")
        if cached:
            return User.parse_obj(cached)
        
        user = await db.get(User, user_id)
        await self.cache.set(f"user:{user_id}", user.dict())
        return user

2. Единое хранилище данных

Все инстансы должны использовать одну БД или распределённое хранилище.

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Общее для всех инстансов
DATABASE_URL = "postgresql://user:pass@db-server:5432/myapp"
engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=40,
)

SessionLocal = sessionmaker(bind=engine)

# Все инстансы подключаются к одному хосту

3. Распределённое кэширование

Кэш должен быть доступен всем инстансам через Redis или Memcached.

import redis
from typing import Any

class DistributedCache:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url, decode_responses=True)
    
    async def get(self, key: str) -> Any | None:
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 3600):
        self.redis.setex(key, ttl, json.dumps(value))
    
    async def invalidate(self, pattern: str):
        # Инвалидировать кэш во всех инстансах
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

# Использование
cache = DistributedCache("redis://redis-server:6379")
await cache.set("user:123", {"id": 123, "name": "Alice"})

4. Session Management

Сессии должны храниться в распределённом хранилище, не в памяти процесса.

from fastapi.sessions import SessionMiddleware
from starlette.middleware.sessions import SessionMiddleware
import redis

# Плохо: сессии в памяти приложения
# app.add_middleware(SessionMiddleware, secret_key="secret")

# Хорошо: сессии в Redis
from fastapi_sessions.backends.implementations import RedisSessionBackend
from fastapi_sessions.session_verifier import SessionVerifier

backend = RedisSessionBackend(redis_url="redis://redis-server:6379")

class SessionData(BaseModel):
    user_id: int
    email: str

class BasicVerifier(SessionVerifier[SessionData]):
    def __init__(self, backend: RedisSessionBackend):
        self.backend = backend
    
    async def verify_session(self, model: SessionData) -> bool:
        return model.user_id is not None

5. Консистентность данных

Использовать транзакции и оптимистичные блокировки.

from sqlalchemy.orm import Session
from sqlalchemy import __version__, select

class Account:
    __tablename__ = "accounts"
    id = Column(Integer, primary_key=True)
    balance = Column(Float)
    version = Column(Integer, default=1)  # Для оптимистичной блокировки

def transfer_money(db: Session, from_id: int, to_id: int, amount: float):
    # Транзакция обеспечивает консистентность
    try:
        from_account = db.query(Account).with_for_update().get(from_id)
        to_account = db.query(Account).with_for_update().get(to_id)
        
        if from_account.balance < amount:
            raise ValueError("Insufficient funds")
        
        from_account.balance -= amount
        from_account.version += 1
        
        to_account.balance += amount
        to_account.version += 1
        
        db.commit()
    except Exception:
        db.rollback()
        raise

6. Асинхронность для масштабируемости

Использовать async/await для обработки большого количества соединений.

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

app = FastAPI()

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db-server/myapp",
    echo=False,
)

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalars().first()
    return user

7. Логирование и трассировка

Использовать распределённую трассировку для отладки.

from opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider

jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger-collector",
    agent_port=6831,
)

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
    jaeger.thrift.JaegerExporter()
)

tracer = trace.get_tracer(__name__)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    with tracer.start_as_current_span(f"get_user_{user_id}"):
        # Этот span будет видим во всех инстансах в Jaeger
        user = await fetch_user(user_id)
        return user

8. Конфигурация из окружения

Конфигурация должна приходить из переменных окружения, не из файлов.

from pydantic import BaseSettings

class Settings(BaseSettings):
    database_url: str
    redis_url: str
    debug: bool = False
    api_key: str
    
    class Config:
        env_file = ".env"

settings = Settings()

# Использование
engine = create_engine(settings.database_url)
redis_client = redis.from_url(settings.redis_url)

9. Health checks и graceful shutdown

Приложение должно уметь сигнализировать о своём состоянии.

from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
async def health_check():
    # Для load balancer
    return {"status": "healthy"}

@app.get("/ready")
async def readiness_check():
    # Готово ли приложение к работе
    try:
        async with AsyncSessionLocal() as db:
            await db.execute(select(1))
        return {"ready": True}
    except Exception:
        return {"ready": False}

@app.on_event("shutdown")
async def shutdown_event():
    # Корректное завершение
    await engine.dispose()
    logger.info("Application shutdown complete")

10. Load balancing и sticky sessions

Настроить load balancer для распределения запросов.

# nginx.conf
upstream app_servers {
    server app1:8000;
    server app2:8000;
    server app3:8000;
}

server {
    listen 80;
    
    location / {
        proxy_pass http://app_servers;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        
        # Sticky sessions (если нужны)
        # hash $cookie_sessionid consistent;
    }
}

11. Миграции БД

Выполнять миграции безопасно при наличии нескольких инстансов.

# Использовать Alembic или Goose
# Миграции должны быть backwards-compatible

# 1. Добавить новую колонку
ALTER TABLE users ADD COLUMN phone VARCHAR(20);

# 2. Обновить код (может работать со старой схемой)
class User(Base):
    phone: str | None = None

# 3. Удалить старую колонку
ALTER TABLE users DROP COLUMN old_phone;

Best Practices для распределённых систем

  • Stateless — всё состояние во внешних хранилищах
  • Horizontal scaling — добавлять инстансы без изменения кода
  • Graceful degradation — система работает при частичном отказе
  • Monitoring — знать, что происходит на каждом инстансе
  • Idempotent operations — операции можно повторять без проблем
  • Circuit breakers — защита от каскадных отказов
  • Database indexing — оптимизация запросов при масштабировании
  • Connection pooling — эффективное использование соединений

Это ключевые принципы построения масштабируемых распределённых приложений.

Каким должно быть приложение, с точки зрения специфики разработки, чтобы оно работало хорошо в нескольких экземплярах на разных серверах? | PrepBro