Каким способом чаще обращаешься к БД?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Способы обращения к базам данных: My Approach
В моей практике я использую несколько подходов в зависимости от контекста и требований проекта. Вот мой анализ и предпочтения.
1. ORM (SQLAlchemy) — Основной выбор
ORM (Object-Relational Mapping) — это мой основной инструмент для 80-90% случаев.
Почему ORM?
✅ Безопасность — защита от SQL injection ✅ Абстракция — переход между БД без изменений кода ✅ Type hints — IDE автодополнение ✅ Relationships — автоматические связи между объектами ✅ Lazy loading — загрузка только необходимых данных ✅ Query optimization — caching и batching
SQLAlchemy ORM (FastAPI, async)
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, Session, sessionmaker, relationship
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationship
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(String(2000))
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationship
author = relationship("User", back_populates="posts")
# Async setup
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
echo=False,
pool_size=20,
max_overflow=0,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# Использование в FastAPI
from fastapi import FastAPI, Depends
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
# ORM query
user = await db.get(User, user_id) # Простой get
# Relationship автоматически загружена
return {
"id": user.id,
"username": user.username,
"posts": user.posts # Relationships work!
}
Сложные ORM queries
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload
# Simple query
stmt = select(User).where(User.id == 1)
user = await db.scalar(stmt)
# Filter multiple conditions
stmt = select(User).where(
and_(
User.created_at > datetime(2024, 1, 1),
or_(
User.username.like('%admin%'),
User.email.like('%admin%')
)
)
)
users = await db.scalars(stmt)
# Joinedload для eager loading (N+1 prevention)
stmt = select(User).options(joinedload(User.posts))
user = await db.scalar(stmt)
# posts уже загружены!
# Aggregation
stmt = select(func.count(Post.id)).where(Post.user_id == user_id)
count = await db.scalar(stmt)
# Group by
stmt = (
select(User.id, func.count(Post.id).label("post_count"))
.join(Post)
.group_by(User.id)
.order_by(func.count(Post.id).desc())
)
results = await db.execute(stmt)
2. Raw SQL (для сложных queries)
Raw SQL — используется для 10-20% сложных случаев, где ORM неэффективен.
Когда использовать Raw SQL?
✅ Сложные JOIN'ы (3+ таблицы) ✅ Window functions (ROW_NUMBER, RANK, etc.) ✅ CTE (Common Table Expressions) — рекурсивные запросы ✅ Batch operations — миллионы записей ✅ Performance-critical — когда нужна максимальная скорость
Пример Raw SQL
from sqlalchemy import text
# Simple raw SQL
result = await db.execute(
text("SELECT * FROM users WHERE id = :user_id"),
{"user_id": 1}
)
user = result.fetchone()
# Named parameters (безопасно от SQL injection)
query = """
SELECT u.id, u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.created_at > :start_date
AND u.username LIKE :search
GROUP BY u.id
ORDER BY post_count DESC
LIMIT :limit
"""
result = await db.execute(
text(query),
{
"start_date": datetime(2024, 1, 1),
"search": f"%{search_term}%",
"limit": 10
}
)
rows = result.fetchall()
# CTE example (Window function)
query = """
WITH user_posts AS (
SELECT
user_id,
COUNT(*) as post_count,
ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) as rank
FROM posts
GROUP BY user_id
)
SELECT u.username, up.post_count, up.rank
FROM users u
JOIN user_posts up ON u.id = up.user_id
WHERE up.rank <= 10
"""
result = await db.execute(text(query))
top_users = result.fetchall()
3. Hybrid Approach (ORM + Raw SQL)
Мой практический подход — использую оба вместе, где это имеет смысл.
# Простое: используй ORM
user = await db.get(User, user_id)
# Среднее: используй ORM query builder
stmt = select(User).where(User.email == email).limit(1)
user = await db.scalar(stmt)
# Сложное: используй raw SQL
result = await db.execute(
text("""
SELECT u.id, u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.id
HAVING COUNT(p.id) > :min_posts
ORDER BY COUNT(p.id) DESC
"""),
{"min_posts": 5}
)
4. Query Optimization Techniques
N+1 Problem
# ❌ Плохо: N+1 query
users = await db.scalars(select(User))
for user in users: # Для каждого user — отдельный запрос!
posts = user.posts
print(f"{user.username}: {len(posts)} posts")
# Итого: 1 (users) + N (posts for each user) = N+1 query
# ✅ Хорошо: Eager loading
stmt = select(User).options(joinedload(User.posts))
users = await db.scalars(stmt)
for user in users: # posts уже в памяти!
posts = user.posts
print(f"{user.username}: {len(posts)} posts")
# Итого: 1 query (с JOIN)
Index Usage
from sqlalchemy import Index
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(100))
created_at = Column(DateTime)
# Индексы
__table_args__ = (
Index('idx_email', 'email'), # Для поиска по email
Index('idx_created_at', 'created_at'), # Для сортировки
Index('idx_email_created', 'email', 'created_at'), # Compound
)
5. Batch Operations
Для больших объёмов данных используй batch operations, не ORM.
# ❌ Плохо: ORM в цикле (очень медленно)
for i in range(1000000):
user = User(username=f"user{i}", email=f"user{i}@example.com")
db.add(user)
if i % 1000 == 0:
db.commit()
# Итого: 1000 commits, каждый медленный
# ✅ Хорошо: Batch insert
users = [
User(username=f"user{i}", email=f"user{i}@example.com")
for i in range(1000000)
]
db.bulk_insert_mappings(User, users)
# Итого: 1 bulk operation, очень быстро
# ✅ Ещё лучше: Raw SQL COPY
await db.execute(
text("""
COPY users (username, email) FROM STDIN
"""),
values # Передай данные
)
# Итого: Самый быстрый способ для postgres
6. Connection Pooling
Всегда используй connection pooling для production.
from sqlalchemy.pool import QueuePool
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
poolclass=QueuePool,
pool_size=20, # Количество connections в pool
max_overflow=10, # Дополнительные connections при peak load
pool_pre_ping=True, # Проверь connection перед использованием
pool_recycle=3600, # Переиспользуй connections каждый час
)
7. Миграции (Alembic)
Используй Alembic для миграций (или Goose в моём случае).
# Создание новой миграции
alembic revision --autogenerate -m "Add user email field"
# Применение
alembic upgrade head
# Откат
alembic downgrade -1
Мой типичный stack
# models.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(String(100), unique=True)
created_at = Column(DateTime)
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/db",
pool_size=20,
max_overflow=10,
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
# schemas.py
from pydantic import BaseModel
from datetime import datetime
class UserCreate(BaseModel):
username: str
email: str
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
class Config:
from_attributes = True
# crud.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
class UserCRUD:
@staticmethod
async def create(db: AsyncSession, user_data: UserCreate) -> User:
user = User(**user_data.dict())
db.add(user)
await db.commit()
await db.refresh(user)
return user
@staticmethod
async def get_by_id(db: AsyncSession, user_id: int) -> User | None:
return await db.get(User, user_id)
@staticmethod
async def get_by_email(db: AsyncSession, email: str) -> User | None:
stmt = select(User).where(User.email == email)
return await db.scalar(stmt)
@staticmethod
async def list_all(db: AsyncSession, skip: int = 0, limit: int = 100):
stmt = select(User).offset(skip).limit(limit)
return await db.scalars(stmt)
@staticmethod
async def update(db: AsyncSession, user_id: int, user_data: dict) -> User:
user = await db.get(User, user_id)
for key, value in user_data.items():
setattr(user, key, value)
await db.commit()
await db.refresh(user)
return user
@staticmethod
async def delete(db: AsyncSession, user_id: int) -> None:
user = await db.get(User, user_id)
await db.delete(user)
await db.commit()
# main.py
from fastapi import FastAPI, Depends
from database import AsyncSessionLocal
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.post("/users", response_model=UserResponse)
async def create_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
return await UserCRUD.create(db, user_data)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
user = await UserCRUD.get_by_id(db, user_id)
if not user:
raise HTTPException(status_code=404)
return user
Мои предпочтения
80-90% случаев: ✅ SQLAlchemy ORM — simple queries, type safe
10-20% случаев: ✅ Raw SQL — complex queries, performance critical
Batch operations: ✅ bulk_insert_mappings или Raw SQL COPY
Миграции: ✅ Alembic или Goose (raw SQL)
Performance: ✅ Connection pooling всегда ✅ Eager loading с joinedload ✅ Правильные индексы ✅ EXPLAIN для анализа
Выбор подхода зависит от сложности запроса и требований производительности.