← Назад к вопросам

Какие знаешь факторы, которые могут повлиять на пропускную способность приложения, написанного на FastAPI?

3.0 Senior🔥 151 комментариев
#FastAPI и Flask

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Факторы, влияющие на пропускную способность FastAPI

Пропускная способность (throughput) — это количество запросов, которые приложение может обработать за единицу времени. На неё влияет множество факторов.

Архитектура и инфраструктура

Количество рабочих процессов (workers)

FastAPI работает с ASGI-сервером (uvicorn, hypercorn). Количество workers должно соответствовать количеству ядер CPU:

# Один worker — одно ядро не используется полностью
uvicorn main:app --workers 1

# Оптимально: количество ядер
uvicorn main:app --workers 4  # для 4-ядерного процессора

Асинхронность

FastAPI использует async/await для параллельной обработки I/O операций:

# Неэффективно: синхронная функция блокирует worker
@app.get("/slow")
def slow_endpoint():
    time.sleep(2)  # Блокирует весь worker
    return {"status": "done"}

# Эффективно: асинхронная функция не блокирует
@app.get("/fast")
async def fast_endpoint():
    await asyncio.sleep(2)  # Не блокирует, worker может обработать другие запросы
    return {"status": "done"}

Производительность кода

Алгоритмическая сложность

О(n²) алгоритм будет узким местом при обработке больших данных:

# Плохо: O(n²) — 1 млн операций на 1000 элементов
@app.post("/search")
async def search(items: list[int], target: int):
    results = []
    for item in items:
        for sub_item in items:  # Вложенный цикл!
            if item + sub_item == target:
                results.append((item, sub_item))
    return results

# Хорошо: O(n) — 1000 операций
@app.post("/search")
async def search(items: list[int], target: int):
    seen = set()
    results = []
    for item in items:
        complement = target - item
        if complement in seen:
            results.append((item, complement))
        seen.add(item)
    return results

Утечки памяти

Глобальные переменные, незакрытые соединения — всё это снижает пропускную способность:

# Плохо: соединение никогда не закрывается
db_connection = None

@app.on_event("startup")
async def startup():
    global db_connection
    db_connection = await connect_db()  # Никогда не закрывается

# Хорошо: контролируем жизненный цикл
@app.on_event("startup")
async def startup():
    app.state.db = await connect_db()

@app.on_event("shutdown")
async def shutdown():
    await app.state.db.close()

База данных

Пулинг соединений

Без пула каждый запрос создаёт новое соединение (медленно):

from sqlalchemy.pool import QueuePool
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    poolclass=QueuePool,
    pool_size=10,        # Количество соединений в пуле
    max_overflow=20,     # Дополнительные соединения при нагрузке
)

Индексы и запросы

Медленный SQL — это узкое место номер один:

# Плохо: N+1 проблема
@app.get("/users-with-posts")
async def get_users():
    users = await db.fetch("SELECT * FROM users")
    for user in users:
        posts = await db.fetch("SELECT * FROM posts WHERE user_id = ?", user[id])
        # На 1000 пользователей — 1001 запрос!

# Хорошо: JOIN
@app.get("/users-with-posts")
async def get_users():
    return await db.fetch("""
        SELECT u.*, p.* FROM users u
        LEFT JOIN posts p ON u.id = p.user_id
    """)

Кэширование

HTTP кэширование

from fastapi import Response

@app.get("/data")
async def get_data(response: Response):
    response.headers["Cache-Control"] = "max-age=3600"  # Кэш на 1 час
    return {"data": "some value"}

Приложение кэширование

from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_computation(param: str):
    # Вычисляется один раз, затем берётся из кэша
    return {"result": param * 1000}

# Для асинхронных функций нужна свою библиотека
from aioredis import Redis

redis = None

@app.on_event("startup")
async def startup():
    global redis
    redis = await Redis.from_url("redis://localhost")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    cached = await redis.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    user = await db.fetchrow("SELECT * FROM users WHERE id = ?", user_id)
    await redis.setex(f"user:{user_id}", 3600, json.dumps(user))
    return user

Сетевые факторы

Размер ответа

# Плохо: отправляем всё подряд
@app.get("/all-data")
async def all_data():
    return await db.fetch("SELECT * FROM huge_table")  # Может быть 100 МБ

# Хорошо: пагинация
@app.get("/data")
async def get_data(skip: int = 0, limit: int = 20):
    return await db.fetch(
        "SELECT * FROM huge_table LIMIT ? OFFSET ?",
        limit, skip
    )

Сжатие ответов

from fastapi.middleware.gzip import GZIPMiddleware

app.add_middleware(GZIPMiddleware, minimum_size=1000)

Мониторинг и профилирование

Определение узких мест

import time
from fastapi import Request

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    if process_time > 1.0:  # Если запрос дольше 1 секунды
        print(f"Slow request: {request.url} took {process_time}s")
    return response

Заключение

Для максимальной пропускной способности FastAPI нужно:

  1. Использовать async/await везде, где есть I/O
  2. Оптимизировать SQL запросы (индексы, JOIN, избегать N+1)
  3. Кэшировать часто запрашиваемые данные
  4. Настроить пулинг соединений к БД
  5. Масштабировать горизонтально — добавлять workers/сервера
  6. Мониторить и находить узкие места
  7. Сжимать ответы и использовать пагинацию