← Назад к вопросам
Какие знаешь факторы, которые могут повлиять на пропускную способность приложения, написанного на FastAPI?
3.0 Senior🔥 151 комментариев
#FastAPI и Flask
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Факторы, влияющие на пропускную способность FastAPI
Пропускная способность (throughput) — это количество запросов, которые приложение может обработать за единицу времени. На неё влияет множество факторов.
Архитектура и инфраструктура
Количество рабочих процессов (workers)
FastAPI работает с ASGI-сервером (uvicorn, hypercorn). Количество workers должно соответствовать количеству ядер CPU:
# Один worker — одно ядро не используется полностью
uvicorn main:app --workers 1
# Оптимально: количество ядер
uvicorn main:app --workers 4 # для 4-ядерного процессора
Асинхронность
FastAPI использует async/await для параллельной обработки I/O операций:
# Неэффективно: синхронная функция блокирует worker
@app.get("/slow")
def slow_endpoint():
time.sleep(2) # Блокирует весь worker
return {"status": "done"}
# Эффективно: асинхронная функция не блокирует
@app.get("/fast")
async def fast_endpoint():
await asyncio.sleep(2) # Не блокирует, worker может обработать другие запросы
return {"status": "done"}
Производительность кода
Алгоритмическая сложность
О(n²) алгоритм будет узким местом при обработке больших данных:
# Плохо: O(n²) — 1 млн операций на 1000 элементов
@app.post("/search")
async def search(items: list[int], target: int):
results = []
for item in items:
for sub_item in items: # Вложенный цикл!
if item + sub_item == target:
results.append((item, sub_item))
return results
# Хорошо: O(n) — 1000 операций
@app.post("/search")
async def search(items: list[int], target: int):
seen = set()
results = []
for item in items:
complement = target - item
if complement in seen:
results.append((item, complement))
seen.add(item)
return results
Утечки памяти
Глобальные переменные, незакрытые соединения — всё это снижает пропускную способность:
# Плохо: соединение никогда не закрывается
db_connection = None
@app.on_event("startup")
async def startup():
global db_connection
db_connection = await connect_db() # Никогда не закрывается
# Хорошо: контролируем жизненный цикл
@app.on_event("startup")
async def startup():
app.state.db = await connect_db()
@app.on_event("shutdown")
async def shutdown():
await app.state.db.close()
База данных
Пулинг соединений
Без пула каждый запрос создаёт новое соединение (медленно):
from sqlalchemy.pool import QueuePool
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
poolclass=QueuePool,
pool_size=10, # Количество соединений в пуле
max_overflow=20, # Дополнительные соединения при нагрузке
)
Индексы и запросы
Медленный SQL — это узкое место номер один:
# Плохо: N+1 проблема
@app.get("/users-with-posts")
async def get_users():
users = await db.fetch("SELECT * FROM users")
for user in users:
posts = await db.fetch("SELECT * FROM posts WHERE user_id = ?", user[id])
# На 1000 пользователей — 1001 запрос!
# Хорошо: JOIN
@app.get("/users-with-posts")
async def get_users():
return await db.fetch("""
SELECT u.*, p.* FROM users u
LEFT JOIN posts p ON u.id = p.user_id
""")
Кэширование
HTTP кэширование
from fastapi import Response
@app.get("/data")
async def get_data(response: Response):
response.headers["Cache-Control"] = "max-age=3600" # Кэш на 1 час
return {"data": "some value"}
Приложение кэширование
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_computation(param: str):
# Вычисляется один раз, затем берётся из кэша
return {"result": param * 1000}
# Для асинхронных функций нужна свою библиотека
from aioredis import Redis
redis = None
@app.on_event("startup")
async def startup():
global redis
redis = await Redis.from_url("redis://localhost")
@app.get("/users/{user_id}")
async def get_user(user_id: int):
cached = await redis.get(f"user:{user_id}")
if cached:
return json.loads(cached)
user = await db.fetchrow("SELECT * FROM users WHERE id = ?", user_id)
await redis.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
Сетевые факторы
Размер ответа
# Плохо: отправляем всё подряд
@app.get("/all-data")
async def all_data():
return await db.fetch("SELECT * FROM huge_table") # Может быть 100 МБ
# Хорошо: пагинация
@app.get("/data")
async def get_data(skip: int = 0, limit: int = 20):
return await db.fetch(
"SELECT * FROM huge_table LIMIT ? OFFSET ?",
limit, skip
)
Сжатие ответов
from fastapi.middleware.gzip import GZIPMiddleware
app.add_middleware(GZIPMiddleware, minimum_size=1000)
Мониторинг и профилирование
Определение узких мест
import time
from fastapi import Request
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
if process_time > 1.0: # Если запрос дольше 1 секунды
print(f"Slow request: {request.url} took {process_time}s")
return response
Заключение
Для максимальной пропускной способности FastAPI нужно:
- Использовать async/await везде, где есть I/O
- Оптимизировать SQL запросы (индексы, JOIN, избегать N+1)
- Кэшировать часто запрашиваемые данные
- Настроить пулинг соединений к БД
- Масштабировать горизонтально — добавлять workers/сервера
- Мониторить и находить узкие места
- Сжимать ответы и использовать пагинацию