Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация работы FastAPI
FastAPI — это высокопроизводительный фреймворк, но есть множество техник для дальнейшего улучшения производительности. Оптимизация должна быть осознанной и основана на профилировании.
1. Использование асинхронных операций
FastAPI построен на async/await, используй это везде:
from fastapi import FastAPI
import asyncio
app = FastAPI()
# ❌ Неправильно — блокирующая операция
@app.get("/users/{user_id}")
def get_user(user_id: int):
user = fetch_from_db(user_id) # Блокирует весь поток
return user
# ✅ Правильно — асинхронная операция
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await fetch_from_db_async(user_id) # Не блокирует
return user
# ✅ Если операция синхронная, используй run_in_threadpool
from fastapi.concurrency import run_in_threadpool
@app.get("/heavy-compute")
async def heavy_compute():
result = await run_in_threadpool(blocking_function)
return result
2. Connection pooling для БД
Используй пулы соединений для эффективного использования ресурсов:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool
# ❌ Без пула — создает новое соединение для каждого запроса
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
poolclass=NullPool # Плохо для production
)
# ✅ С пулом — переиспользует соединения
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
echo=False,
pool_size=20, # Максимум 20 соединений
max_overflow=40, # До 40 дополнительных
pool_pre_ping=True, # Проверяет соединения
pool_recycle=3600, # Переиспользует после 1 часа
)
3. Кэширование
Используй кэширование для часто запрашиваемых данных:
from fastapi import FastAPI
from fastapi_cache2 import FastAPICache
from fastapi_cache2.backends.redis import RedisBackend
from fastapi_cache2.decorators import cache
app = FastAPI()
# Инициализация
from redis import asyncio as aioredis
@app.on_event("startup")
async def startup():
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
# Кэширование результатов
@app.get("/items/{item_id}")
@cache(expire=3600) # Кэш на 1 час
async def get_item(item_id: int):
return await fetch_item_from_db(item_id)
# Ручное кэширование
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_computation(x: int) -> int:
return x ** 2
4. Ограничение и пагинация ответов
Не возвращай все данные сразу:
from pydantic import BaseModel
from typing import Optional
class PaginationParams(BaseModel):
skip: int = 0
limit: int = 10
@app.get("/items")
async def list_items(skip: int = 0, limit: int = 10):
# Используй LIMIT в SQL запросе
items = await db.query(
"SELECT * FROM items LIMIT :limit OFFSET :skip",
{"limit": limit, "skip": skip}
)
return items
# Или с SQLAlchemy
from sqlalchemy import select
@app.get("/items")
async def list_items(skip: int = 0, limit: int = 10):
query = select(Item).offset(skip).limit(limit)
items = await db.execute(query)
return items.scalars().all()
5. Индексация БД
Убедись что часто запрашиваемые поля индексированы:
from sqlalchemy import Column, Integer, String, Index
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, index=True) # Индекс на email
username = Column(String, index=True)
__table_args__ = (
Index('idx_email_username', 'email', 'username'), # Составной индекс
)
6. Использование select() вместо load()
Оптимизируй SQL запросы:
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
# ❌ N+1 запросы
@app.get("/users")
async def get_users():
users = await db.execute(select(User))
return users.scalars().all()
# Для каждого юзера будет отдельный запрос его посты!
# ✅ Eager loading
@app.get("/users")
async def get_users():
query = select(User).options(selectinload(User.posts))
users = await db.execute(query)
return users.scalars().unique().all()
7. Профилирование и мониторинг
from fastapi import FastAPI
import time
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# Используй Prometheus для мониторинга
from prometheus_client import Counter, Histogram
import time
request_count = Counter(
'fastapi_requests_total',
'Total requests',
['method', 'endpoint']
)
request_time = Histogram(
'fastapi_request_duration_seconds',
'Request latency',
['method', 'endpoint']
)
8. Сжатие ответов
Используй GZip для сжатия больших ответов:
from fastapi.middleware.gzip import GZIPMiddleware
app = FastAPI()
app.add_middleware(GZIPMiddleware, minimum_size=1000) # Сжимать если > 1KB
9. Оптимизация Pydantic моделей
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional
class Item(BaseModel):
# ✅ Используй ConfigDict для оптимизации
model_config = ConfigDict(
from_attributes=True, # ORM mode
validate_assignment=False, # Отключи валидацию для улучшения скорости
)
id: int
name: str = Field(..., max_length=100)
description: Optional[str] = None
# ✅ Не валидируй при сериализации
class ItemResponse(BaseModel):
id: int
name: str
model_config = ConfigDict(validate_assignment=False)
10. Использование Background Tasks
Делегируй долгие операции в background:
from fastapi import BackgroundTasks
import asyncio
@app.post("/send-email/")
async def send_email(email: str, background_tasks: BackgroundTasks):
# Добавь долгую операцию в background
background_tasks.add_task(send_email_async, email)
return {"message": "Email will be sent in background"}
async def send_email_async(email: str):
# Долгая операция
await asyncio.sleep(10)
print(f"Email sent to {email}")
11. Настройка Uvicorn
# Production конфиг
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--access-log \
--use-colors
# Или программно
import uvicorn
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=4,
loop="uvloop",
http="httptools",
)
12. Пример оптимизированного приложения
from fastapi import FastAPI, Depends
from fastapi.middleware.gzip import GZIPMiddleware
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.pool import QueuePool
app = FastAPI()
app.add_middleware(GZIPMiddleware, minimum_size=1000)
# Оптимизированный engine
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
pool_size=20,
max_overflow=10,
echo=False,
)
@app.on_event("startup")
async def startup():
# Инициализируй кэш, пулы и т.д.
pass
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
async def get_db() -> AsyncSession:
async with AsyncSession(engine) as session:
yield session
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):
# Асинхронный запрос с индексами
result = await db.execute(
select(Item).where(Item.id == item_id)
)
return result.scalar()
Ключевые правила оптимизации
✅ Используй async/await везде — это главное ✅ Connection pooling — критично для БД ✅ Кэширование — для часто запрашиваемых данных ✅ Пагинация — не возвращай всё сразу ✅ Индексы БД — на часто используемые поля ✅ Профилирование — прежде чем оптимизировать ✅ Gzip compression — для больших ответов ✅ Background tasks — для долгих операций
Используй эти техники для достижения максимальной производительности FastAPI.