Каким должно быть приложение, с точки зрения специфики разработки, чтобы оно работало хорошо в нескольких экземплярах на разных серверах?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Приложение для распределённых систем
Для корректной работы приложения в нескольких экземплярах на разных серверах требуется учесть множество аспектов архитектуры и разработки.
1. Stateless архитектура
Приложение должно быть полностью stateless — не хранить состояние локально.
# Плохо: stateful приложение
class UserService:
users_cache = {} # Локальное состояние
def get_user(self, user_id):
if user_id in self.users_cache:
return self.users_cache[user_id]
user = db.query(User).get(user_id)
self.users_cache[user_id] = user
return user
# Хорошо: stateless приложение
class UserService:
def __init__(self, cache_service: CacheService):
self.cache = cache_service
async def get_user(self, user_id: int) -> User:
# Состояние в Redis, доступно всем инстансам
cached = await self.cache.get(f"user:{user_id}")
if cached:
return User.parse_obj(cached)
user = await db.get(User, user_id)
await self.cache.set(f"user:{user_id}", user.dict())
return user
2. Единое хранилище данных
Все инстансы должны использовать одну БД или распределённое хранилище.
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Общее для всех инстансов
DATABASE_URL = "postgresql://user:pass@db-server:5432/myapp"
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20,
max_overflow=40,
)
SessionLocal = sessionmaker(bind=engine)
# Все инстансы подключаются к одному хосту
3. Распределённое кэширование
Кэш должен быть доступен всем инстансам через Redis или Memcached.
import redis
from typing import Any
class DistributedCache:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url, decode_responses=True)
async def get(self, key: str) -> Any | None:
value = self.redis.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
self.redis.setex(key, ttl, json.dumps(value))
async def invalidate(self, pattern: str):
# Инвалидировать кэш во всех инстансах
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
# Использование
cache = DistributedCache("redis://redis-server:6379")
await cache.set("user:123", {"id": 123, "name": "Alice"})
4. Session Management
Сессии должны храниться в распределённом хранилище, не в памяти процесса.
from fastapi.sessions import SessionMiddleware
from starlette.middleware.sessions import SessionMiddleware
import redis
# Плохо: сессии в памяти приложения
# app.add_middleware(SessionMiddleware, secret_key="secret")
# Хорошо: сессии в Redis
from fastapi_sessions.backends.implementations import RedisSessionBackend
from fastapi_sessions.session_verifier import SessionVerifier
backend = RedisSessionBackend(redis_url="redis://redis-server:6379")
class SessionData(BaseModel):
user_id: int
email: str
class BasicVerifier(SessionVerifier[SessionData]):
def __init__(self, backend: RedisSessionBackend):
self.backend = backend
async def verify_session(self, model: SessionData) -> bool:
return model.user_id is not None
5. Консистентность данных
Использовать транзакции и оптимистичные блокировки.
from sqlalchemy.orm import Session
from sqlalchemy import __version__, select
class Account:
__tablename__ = "accounts"
id = Column(Integer, primary_key=True)
balance = Column(Float)
version = Column(Integer, default=1) # Для оптимистичной блокировки
def transfer_money(db: Session, from_id: int, to_id: int, amount: float):
# Транзакция обеспечивает консистентность
try:
from_account = db.query(Account).with_for_update().get(from_id)
to_account = db.query(Account).with_for_update().get(to_id)
if from_account.balance < amount:
raise ValueError("Insufficient funds")
from_account.balance -= amount
from_account.version += 1
to_account.balance += amount
to_account.version += 1
db.commit()
except Exception:
db.rollback()
raise
6. Асинхронность для масштабируемости
Использовать async/await для обработки большого количества соединений.
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
app = FastAPI()
engine = create_async_engine(
"postgresql+asyncpg://user:pass@db-server/myapp",
echo=False,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalars().first()
return user
7. Логирование и трассировка
Использовать распределённую трассировку для отладки.
from opentelemetry import trace, logging
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-collector",
agent_port=6831,
)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
jaeger.thrift.JaegerExporter()
)
tracer = trace.get_tracer(__name__)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
with tracer.start_as_current_span(f"get_user_{user_id}"):
# Этот span будет видим во всех инстансах в Jaeger
user = await fetch_user(user_id)
return user
8. Конфигурация из окружения
Конфигурация должна приходить из переменных окружения, не из файлов.
from pydantic import BaseSettings
class Settings(BaseSettings):
database_url: str
redis_url: str
debug: bool = False
api_key: str
class Config:
env_file = ".env"
settings = Settings()
# Использование
engine = create_engine(settings.database_url)
redis_client = redis.from_url(settings.redis_url)
9. Health checks и graceful shutdown
Приложение должно уметь сигнализировать о своём состоянии.
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
async def health_check():
# Для load balancer
return {"status": "healthy"}
@app.get("/ready")
async def readiness_check():
# Готово ли приложение к работе
try:
async with AsyncSessionLocal() as db:
await db.execute(select(1))
return {"ready": True}
except Exception:
return {"ready": False}
@app.on_event("shutdown")
async def shutdown_event():
# Корректное завершение
await engine.dispose()
logger.info("Application shutdown complete")
10. Load balancing и sticky sessions
Настроить load balancer для распределения запросов.
# nginx.conf
upstream app_servers {
server app1:8000;
server app2:8000;
server app3:8000;
}
server {
listen 80;
location / {
proxy_pass http://app_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Sticky sessions (если нужны)
# hash $cookie_sessionid consistent;
}
}
11. Миграции БД
Выполнять миграции безопасно при наличии нескольких инстансов.
# Использовать Alembic или Goose
# Миграции должны быть backwards-compatible
# 1. Добавить новую колонку
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
# 2. Обновить код (может работать со старой схемой)
class User(Base):
phone: str | None = None
# 3. Удалить старую колонку
ALTER TABLE users DROP COLUMN old_phone;
Best Practices для распределённых систем
- Stateless — всё состояние во внешних хранилищах
- Horizontal scaling — добавлять инстансы без изменения кода
- Graceful degradation — система работает при частичном отказе
- Monitoring — знать, что происходит на каждом инстансе
- Idempotent operations — операции можно повторять без проблем
- Circuit breakers — защита от каскадных отказов
- Database indexing — оптимизация запросов при масштабировании
- Connection pooling — эффективное использование соединений
Это ключевые принципы построения масштабируемых распределённых приложений.