Почему FastAPI подходит для проекта с горизонтальным масштабированием?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Почему FastAPI подходит для проекта с горизонтальным масштабированием?
FastAPI — один из лучших выборов для микросервисных архитектур с горизонтальным масштабированием. Это обусловлено несколькими фундаментальными факторами, которые делают фреймворк идеально подходящим для таких систем.
Асинхронность как основа архитектуры
FastAPI построен на асинхронном стеке (async/await) с использованием Starlette и Uvicorn. Это критично для масштабируемости:
# Асинхронный обработчик
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Не блокирует поток выполнения
result = await database.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
return result
# Синхронный обработчик (плохая практика для масштабирования)
@app.get("/users/{user_id}")
def get_user(user_id: int): # ❌ Блокирует поток
result = database.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
return result
Асинхронность позволяет одному процессу обслуживать тысячи одновременных соединений:
- Один синхронный поток может обслуживать 1-2 запроса одновременно
- Один асинхронный процесс может обслуживать 10,000+ запросов
Stateless дизайн приложения
FastAPI по дизайну поощряет создание stateless приложений, что критично для горизонтального масштабирования:
# ✅ ПРАВИЛЬНО — Stateless
app.state.config = load_config() # Один раз при старте
@app.get("/api/data")
async def get_data():
return {"data": "value"}
# ❌ НЕПРАВИЛЬНО — Stateful (проблема для масштабирования)
global_cache = {} # Локальное состояние в приложении
@app.get("/api/data")
async def get_data():
if "data" not in global_cache:
global_cache["data"] = compute_expensive_operation()
return global_cache["data"]
При горизонтальном масштабировании каждый инстанс приложения должен быть независим. Все состояние хранится в внешних системах:
# ✅ Использование Redis для кэша
import aioredis
@app.on_event("startup")
async def startup():
app.redis = await aioredis.create_redis_pool('redis://localhost')
@app.get("/api/data")
async def get_data():
# Получаем из Redis, не из памяти приложения
cached = await app.redis.get("data")
if not cached:
cached = await compute_data()
await app.redis.set("data", cached, expire=3600)
return json.loads(cached)
@app.on_event("shutdown")
async def shutdown():
app.redis.close()
await app.redis.wait_closed()
Минимальные зависимости при каждом запросе
FastAPI эффективно управляет зависимостями, позволяя кэшировать их:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
# Зависимость сессии БД
async def get_db_session() -> AsyncSession:
async with SessionLocal() as session:
yield session
# Кэширование в рамках одного запроса
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
# Это вычисляется один раз за запрос
return await verify_token(token)
@app.get("/api/profile")
async def get_profile(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db_session)
):
# current_user вычисляется один раз и переиспользуется
return {"user": current_user}
Быстрый startup/shutdown
Для горизонтального масштабирования критична скорость запуска и остановки инстансов:
import time
@app.on_event("startup")
async def startup_event():
print(f"Started at {time.time()}")
# Минимальные инициализации
app.db = create_database_pool()
app.cache = init_cache()
@app.on_event("shutdown")
async def shutdown_event():
print(f"Shutdown at {time.time()}")
# Чистая остановка без hanged connections
await app.db.close()
await app.cache.close()
FastAPI позволяет быстро запускать новые инстансы в Kubernetes или Docker Swarm.
Connection pooling и управление ресурсами
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool
# ✅ Правильная конфигурация для масштабирования
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False,
poolclass=QueuePool,
pool_size=20, # Количество постоянных соединений
max_overflow=10, # Дополнительные соединения при пиках
pool_pre_ping=True, # Проверка жизнеспособности перед использованием
pool_recycle=3600, # Переиспользование соединений каждый час
)
# ❌ Для масштабирования
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
poolclass=NullPool # БЕЗ пула — новое соединение на каждый запрос!
)
Правильное управление жизненным циклом контекста
from contextlib import asynccontextmanager
# ✅ Правильно — контролируемый lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting up")
app.db_pool = create_connection_pool()
yield
# Shutdown
print("Shutting down")
await app.db_pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/api/data")
async def get_data():
async with app.db_pool.acquire() as conn:
result = await conn.fetchrow("SELECT * FROM users")
return result
Легкая интеграция с Message Brokers
Для асинхронной обработки в распределенной системе:
from celery import Celery
import aio_pika
# Celery для задач
celery_app = Celery(__name__, broker="redis://localhost")
@celery_app.task
def process_data(user_id: int):
# Выполняется в отдельном процессе
return f"Processed {user_id}"
@app.post("/api/process")
async def start_processing(user_id: int):
# Отправляем задачу в очередь и сразу возвращаем ответ
task = process_data.delay(user_id)
return {"task_id": task.id}
# RabbitMQ для event streaming
@app.on_event("startup")
async def setup_rabbitmq():
app.amqp_connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
app.amqp_channel = await app.amqp_connection.channel()
@app.post("/api/events")
async def publish_event(event_type: str, data: dict):
exchange = await app.amqp_channel.get_exchange("events")
await exchange.publish(
aio_pika.Message(body=json.dumps(data).encode()),
routing_key=event_type
)
return {"status": "published"}
Graceful shutdown для систем оркестрации
import signal
import asyncio
async def shutdown_gracefully(signum, frame):
"""Корректное завершение перед kill'ом контейнером"""
print("Graceful shutdown initiated")
# Даем время завершить текущие запросы
pending_tasks = asyncio.all_tasks()
for task in pending_tasks:
task.cancel()
await asyncio.gather(*pending_tasks, return_exceptions=True)
print("All tasks completed, exiting")
signal.signal(signal.SIGTERM, shutdown_gracefully)
# В Docker/Kubernetes:
# 1. SIGTERM отправляется
# 2. Приложение обрабатывает текущие запросы
# 3. Новые запросы не принимаются (load balancer отключает инстанс)
# 4. Инстанс завершает работу
Health checks для оркестрации
from datetime import datetime
@app.get("/health")
async def health_check():
"""Для Kubernetes/Docker health checks"""
try:
# Проверяем БД
async with app.db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0"
}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}, 503
@app.get("/readiness")
async def readiness_check():
"""Готово ли приложение принимать трафик?"""
if not hasattr(app, "db_pool") or not app.db_pool:
return {"ready": False}, 503
return {"ready": True}
Масштабирование на практике
# docker-compose.yml для локального тестирования масштабирования
version: '3.8'
services:
api1:
build: .
ports:
- "8001:8000"
environment:
- WORKER_ID=1
depends_on:
- db
- redis
api2:
build: .
ports:
- "8002:8000"
environment:
- WORKER_ID=2
depends_on:
- db
- redis
api3:
build: .
ports:
- "8003:8000"
environment:
- WORKER_ID=3
depends_on:
- db
- redis
nginx:
image: nginx:latest
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- api1
- api2
- api3
db:
image: postgres:15
environment:
POSTGRES_PASSWORD: password
redis:
image: redis:7
Заключение
FastAPI идеален для горизонтального масштабирования благодаря:
- Асинхронности — обработка тысяч запросов одним процессом
- Stateless архитектуре — независимые инстансы приложения
- Эффективному управлению ресурсами — пулинг соединений, минимум overhead
- Быстрому запуску/остановке — идеально для облачных платформ
- Интеграции с message brokers — асинхронная обработка задач
- Поддержке health checks — совместимость с Kubernetes и Docker
- Минимальным потреблением памяти — Uvicorn намного легче Django/Flask
Это делает FastAPI идеальным выбором для микросервисов, облачных приложений и систем, требующих линейного масштабирования.