← Назад к вопросам

Почему FastAPI подходит для проекта с горизонтальным масштабированием?

1.2 Junior🔥 161 комментариев
#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Почему FastAPI подходит для проекта с горизонтальным масштабированием?

FastAPI — один из лучших выборов для микросервисных архитектур с горизонтальным масштабированием. Это обусловлено несколькими фундаментальными факторами, которые делают фреймворк идеально подходящим для таких систем.

Асинхронность как основа архитектуры

FastAPI построен на асинхронном стеке (async/await) с использованием Starlette и Uvicorn. Это критично для масштабируемости:

# Асинхронный обработчик
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Не блокирует поток выполнения
    result = await database.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
    return result

# Синхронный обработчик (плохая практика для масштабирования)
@app.get("/users/{user_id}")
def get_user(user_id: int):  # ❌ Блокирует поток
    result = database.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
    return result

Асинхронность позволяет одному процессу обслуживать тысячи одновременных соединений:

  • Один синхронный поток может обслуживать 1-2 запроса одновременно
  • Один асинхронный процесс может обслуживать 10,000+ запросов

Stateless дизайн приложения

FastAPI по дизайну поощряет создание stateless приложений, что критично для горизонтального масштабирования:

# ✅ ПРАВИЛЬНО — Stateless
app.state.config = load_config()  # Один раз при старте

@app.get("/api/data")
async def get_data():
    return {"data": "value"}

# ❌ НЕПРАВИЛЬНО — Stateful (проблема для масштабирования)
global_cache = {}  # Локальное состояние в приложении

@app.get("/api/data")
async def get_data():
    if "data" not in global_cache:
        global_cache["data"] = compute_expensive_operation()
    return global_cache["data"]

При горизонтальном масштабировании каждый инстанс приложения должен быть независим. Все состояние хранится в внешних системах:

# ✅ Использование Redis для кэша
import aioredis

@app.on_event("startup")
async def startup():
    app.redis = await aioredis.create_redis_pool('redis://localhost')

@app.get("/api/data")
async def get_data():
    # Получаем из Redis, не из памяти приложения
    cached = await app.redis.get("data")
    if not cached:
        cached = await compute_data()
        await app.redis.set("data", cached, expire=3600)
    return json.loads(cached)

@app.on_event("shutdown")
async def shutdown():
    app.redis.close()
    await app.redis.wait_closed()

Минимальные зависимости при каждом запросе

FastAPI эффективно управляет зависимостями, позволяя кэшировать их:

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

# Зависимость сессии БД
async def get_db_session() -> AsyncSession:
    async with SessionLocal() as session:
        yield session

# Кэширование в рамках одного запроса
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    # Это вычисляется один раз за запрос
    return await verify_token(token)

@app.get("/api/profile")
async def get_profile(
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db_session)
):
    # current_user вычисляется один раз и переиспользуется
    return {"user": current_user}

Быстрый startup/shutdown

Для горизонтального масштабирования критична скорость запуска и остановки инстансов:

import time

@app.on_event("startup")
async def startup_event():
    print(f"Started at {time.time()}")
    # Минимальные инициализации
    app.db = create_database_pool()
    app.cache = init_cache()

@app.on_event("shutdown")
async def shutdown_event():
    print(f"Shutdown at {time.time()}")
    # Чистая остановка без hanged connections
    await app.db.close()
    await app.cache.close()

FastAPI позволяет быстро запускать новые инстансы в Kubernetes или Docker Swarm.

Connection pooling и управление ресурсами

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool

# ✅ Правильная конфигурация для масштабирования
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=False,
    poolclass=QueuePool,
    pool_size=20,  # Количество постоянных соединений
    max_overflow=10,  # Дополнительные соединения при пиках
    pool_pre_ping=True,  # Проверка жизнеспособности перед использованием
    pool_recycle=3600,  # Переиспользование соединений каждый час
)

# ❌ Для масштабирования
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    poolclass=NullPool  # БЕЗ пула — новое соединение на каждый запрос!
)

Правильное управление жизненным циклом контекста

from contextlib import asynccontextmanager

# ✅ Правильно — контролируемый lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting up")
    app.db_pool = create_connection_pool()
    yield
    # Shutdown
    print("Shutting down")
    await app.db_pool.close()

app = FastAPI(lifespan=lifespan)

@app.get("/api/data")
async def get_data():
    async with app.db_pool.acquire() as conn:
        result = await conn.fetchrow("SELECT * FROM users")
    return result

Легкая интеграция с Message Brokers

Для асинхронной обработки в распределенной системе:

from celery import Celery
import aio_pika

# Celery для задач
celery_app = Celery(__name__, broker="redis://localhost")

@celery_app.task
def process_data(user_id: int):
    # Выполняется в отдельном процессе
    return f"Processed {user_id}"

@app.post("/api/process")
async def start_processing(user_id: int):
    # Отправляем задачу в очередь и сразу возвращаем ответ
    task = process_data.delay(user_id)
    return {"task_id": task.id}

# RabbitMQ для event streaming
@app.on_event("startup")
async def setup_rabbitmq():
    app.amqp_connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    app.amqp_channel = await app.amqp_connection.channel()

@app.post("/api/events")
async def publish_event(event_type: str, data: dict):
    exchange = await app.amqp_channel.get_exchange("events")
    await exchange.publish(
        aio_pika.Message(body=json.dumps(data).encode()),
        routing_key=event_type
    )
    return {"status": "published"}

Graceful shutdown для систем оркестрации

import signal
import asyncio

async def shutdown_gracefully(signum, frame):
    """Корректное завершение перед kill'ом контейнером"""
    print("Graceful shutdown initiated")
    
    # Даем время завершить текущие запросы
    pending_tasks = asyncio.all_tasks()
    for task in pending_tasks:
        task.cancel()
    
    await asyncio.gather(*pending_tasks, return_exceptions=True)
    print("All tasks completed, exiting")

signal.signal(signal.SIGTERM, shutdown_gracefully)

# В Docker/Kubernetes:
# 1. SIGTERM отправляется
# 2. Приложение обрабатывает текущие запросы
# 3. Новые запросы не принимаются (load balancer отключает инстанс)
# 4. Инстанс завершает работу

Health checks для оркестрации

from datetime import datetime

@app.get("/health")
async def health_check():
    """Для Kubernetes/Docker health checks"""
    try:
        # Проверяем БД
        async with app.db_pool.acquire() as conn:
            await conn.fetchval("SELECT 1")
        
        return {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat(),
            "version": "1.0.0"
        }
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}, 503

@app.get("/readiness")
async def readiness_check():
    """Готово ли приложение принимать трафик?"""
    if not hasattr(app, "db_pool") or not app.db_pool:
        return {"ready": False}, 503
    return {"ready": True}

Масштабирование на практике

# docker-compose.yml для локального тестирования масштабирования
version: '3.8'

services:
  api1:
    build: .
    ports:
      - "8001:8000"
    environment:
      - WORKER_ID=1
    depends_on:
      - db
      - redis

  api2:
    build: .
    ports:
      - "8002:8000"
    environment:
      - WORKER_ID=2
    depends_on:
      - db
      - redis

  api3:
    build: .
    ports:
      - "8003:8000"
    environment:
      - WORKER_ID=3
    depends_on:
      - db
      - redis

  nginx:
    image: nginx:latest
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - api1
      - api2
      - api3

  db:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: password

  redis:
    image: redis:7

Заключение

FastAPI идеален для горизонтального масштабирования благодаря:

  1. Асинхронности — обработка тысяч запросов одним процессом
  2. Stateless архитектуре — независимые инстансы приложения
  3. Эффективному управлению ресурсами — пулинг соединений, минимум overhead
  4. Быстрому запуску/остановке — идеально для облачных платформ
  5. Интеграции с message brokers — асинхронная обработка задач
  6. Поддержке health checks — совместимость с Kubernetes и Docker
  7. Минимальным потреблением памяти — Uvicorn намного легче Django/Flask

Это делает FastAPI идеальным выбором для микросервисов, облачных приложений и систем, требующих линейного масштабирования.

Почему FastAPI подходит для проекта с горизонтальным масштабированием? | PrepBro