← Назад к вопросам

Каким способом чаще обращаешься к БД?

1.2 Junior🔥 141 комментариев
#Python Core#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Способы обращения к базам данных: My Approach

В моей практике я использую несколько подходов в зависимости от контекста и требований проекта. Вот мой анализ и предпочтения.

1. ORM (SQLAlchemy) — Основной выбор

ORM (Object-Relational Mapping) — это мой основной инструмент для 80-90% случаев.

Почему ORM?

Безопасность — защита от SQL injection ✅ Абстракция — переход между БД без изменений кода ✅ Type hints — IDE автодополнение ✅ Relationships — автоматические связи между объектами ✅ Lazy loading — загрузка только необходимых данных ✅ Query optimization — caching и batching

SQLAlchemy ORM (FastAPI, async)

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, Session, sessionmaker, relationship
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationship
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(String(2000))
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationship
    author = relationship("User", back_populates="posts")

# Async setup
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    echo=False,
    pool_size=20,
    max_overflow=0,
)

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# Использование в FastAPI
from fastapi import FastAPI, Depends

app = FastAPI()

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    # ORM query
    user = await db.get(User, user_id)  # Простой get
    
    # Relationship автоматически загружена
    return {
        "id": user.id,
        "username": user.username,
        "posts": user.posts  # Relationships work!
    }

Сложные ORM queries

from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload

# Simple query
stmt = select(User).where(User.id == 1)
user = await db.scalar(stmt)

# Filter multiple conditions
stmt = select(User).where(
    and_(
        User.created_at > datetime(2024, 1, 1),
        or_(
            User.username.like('%admin%'),
            User.email.like('%admin%')
        )
    )
)
users = await db.scalars(stmt)

# Joinedload для eager loading (N+1 prevention)
stmt = select(User).options(joinedload(User.posts))
user = await db.scalar(stmt)
# posts уже загружены!

# Aggregation
stmt = select(func.count(Post.id)).where(Post.user_id == user_id)
count = await db.scalar(stmt)

# Group by
stmt = (
    select(User.id, func.count(Post.id).label("post_count"))
    .join(Post)
    .group_by(User.id)
    .order_by(func.count(Post.id).desc())
)
results = await db.execute(stmt)

2. Raw SQL (для сложных queries)

Raw SQL — используется для 10-20% сложных случаев, где ORM неэффективен.

Когда использовать Raw SQL?

Сложные JOIN'ы (3+ таблицы) ✅ Window functions (ROW_NUMBER, RANK, etc.) ✅ CTE (Common Table Expressions) — рекурсивные запросы ✅ Batch operations — миллионы записей ✅ Performance-critical — когда нужна максимальная скорость

Пример Raw SQL

from sqlalchemy import text

# Simple raw SQL
result = await db.execute(
    text("SELECT * FROM users WHERE id = :user_id"),
    {"user_id": 1}
)
user = result.fetchone()

# Named parameters (безопасно от SQL injection)
query = """
    SELECT u.id, u.username, COUNT(p.id) as post_count
    FROM users u
    LEFT JOIN posts p ON u.id = p.user_id
    WHERE u.created_at > :start_date
        AND u.username LIKE :search
    GROUP BY u.id
    ORDER BY post_count DESC
    LIMIT :limit
"""

result = await db.execute(
    text(query),
    {
        "start_date": datetime(2024, 1, 1),
        "search": f"%{search_term}%",
        "limit": 10
    }
)
rows = result.fetchall()

# CTE example (Window function)
query = """
    WITH user_posts AS (
        SELECT 
            user_id,
            COUNT(*) as post_count,
            ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) as rank
        FROM posts
        GROUP BY user_id
    )
    SELECT u.username, up.post_count, up.rank
    FROM users u
    JOIN user_posts up ON u.id = up.user_id
    WHERE up.rank <= 10
"""

result = await db.execute(text(query))
top_users = result.fetchall()

3. Hybrid Approach (ORM + Raw SQL)

Мой практический подход — использую оба вместе, где это имеет смысл.

# Простое: используй ORM
user = await db.get(User, user_id)

# Среднее: используй ORM query builder
stmt = select(User).where(User.email == email).limit(1)
user = await db.scalar(stmt)

# Сложное: используй raw SQL
result = await db.execute(
    text("""
        SELECT u.id, u.username, COUNT(p.id) as post_count
        FROM users u
        LEFT JOIN posts p ON u.id = p.user_id
        GROUP BY u.id
        HAVING COUNT(p.id) > :min_posts
        ORDER BY COUNT(p.id) DESC
    """),
    {"min_posts": 5}
)

4. Query Optimization Techniques

N+1 Problem

# ❌ Плохо: N+1 query
users = await db.scalars(select(User))
for user in users:  # Для каждого user — отдельный запрос!
    posts = user.posts
    print(f"{user.username}: {len(posts)} posts")
# Итого: 1 (users) + N (posts for each user) = N+1 query

# ✅ Хорошо: Eager loading
stmt = select(User).options(joinedload(User.posts))
users = await db.scalars(stmt)
for user in users:  # posts уже в памяти!
    posts = user.posts
    print(f"{user.username}: {len(posts)} posts")
# Итого: 1 query (с JOIN)

Index Usage

from sqlalchemy import Index

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    email = Column(String(100))
    created_at = Column(DateTime)
    
    # Индексы
    __table_args__ = (
        Index('idx_email', 'email'),  # Для поиска по email
        Index('idx_created_at', 'created_at'),  # Для сортировки
        Index('idx_email_created', 'email', 'created_at'),  # Compound
    )

5. Batch Operations

Для больших объёмов данных используй batch operations, не ORM.

# ❌ Плохо: ORM в цикле (очень медленно)
for i in range(1000000):
    user = User(username=f"user{i}", email=f"user{i}@example.com")
    db.add(user)
    if i % 1000 == 0:
        db.commit()
# Итого: 1000 commits, каждый медленный

# ✅ Хорошо: Batch insert
users = [
    User(username=f"user{i}", email=f"user{i}@example.com")
    for i in range(1000000)
]
db.bulk_insert_mappings(User, users)
# Итого: 1 bulk operation, очень быстро

# ✅ Ещё лучше: Raw SQL COPY
await db.execute(
    text("""
        COPY users (username, email) FROM STDIN
    """),
    values  # Передай данные
)
# Итого: Самый быстрый способ для postgres

6. Connection Pooling

Всегда используй connection pooling для production.

from sqlalchemy.pool import QueuePool

engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    poolclass=QueuePool,
    pool_size=20,  # Количество connections в pool
    max_overflow=10,  # Дополнительные connections при peak load
    pool_pre_ping=True,  # Проверь connection перед использованием
    pool_recycle=3600,  # Переиспользуй connections каждый час
)

7. Миграции (Alembic)

Используй Alembic для миграций (или Goose в моём случае).

# Создание новой миграции
alembic revision --autogenerate -m "Add user email field"

# Применение
alembic upgrade head

# Откат
alembic downgrade -1

Мой типичный stack

# models.py
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True)
    email = Column(String(100), unique=True)
    created_at = Column(DateTime)

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    pool_size=20,
    max_overflow=10,
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

# schemas.py
from pydantic import BaseModel
from datetime import datetime

class UserCreate(BaseModel):
    username: str
    email: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime
    
    class Config:
        from_attributes = True

# crud.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

class UserCRUD:
    @staticmethod
    async def create(db: AsyncSession, user_data: UserCreate) -> User:
        user = User(**user_data.dict())
        db.add(user)
        await db.commit()
        await db.refresh(user)
        return user
    
    @staticmethod
    async def get_by_id(db: AsyncSession, user_id: int) -> User | None:
        return await db.get(User, user_id)
    
    @staticmethod
    async def get_by_email(db: AsyncSession, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        return await db.scalar(stmt)
    
    @staticmethod
    async def list_all(db: AsyncSession, skip: int = 0, limit: int = 100):
        stmt = select(User).offset(skip).limit(limit)
        return await db.scalars(stmt)
    
    @staticmethod
    async def update(db: AsyncSession, user_id: int, user_data: dict) -> User:
        user = await db.get(User, user_id)
        for key, value in user_data.items():
            setattr(user, key, value)
        await db.commit()
        await db.refresh(user)
        return user
    
    @staticmethod
    async def delete(db: AsyncSession, user_id: int) -> None:
        user = await db.get(User, user_id)
        await db.delete(user)
        await db.commit()

# main.py
from fastapi import FastAPI, Depends
from database import AsyncSessionLocal

app = FastAPI()

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.post("/users", response_model=UserResponse)
async def create_user(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    return await UserCRUD.create(db, user_data)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    user = await UserCRUD.get_by_id(db, user_id)
    if not user:
        raise HTTPException(status_code=404)
    return user

Мои предпочтения

80-90% случаев: ✅ SQLAlchemy ORM — simple queries, type safe

10-20% случаев: ✅ Raw SQL — complex queries, performance critical

Batch operations: ✅ bulk_insert_mappings или Raw SQL COPY

Миграции: ✅ Alembic или Goose (raw SQL)

Performance: ✅ Connection pooling всегда ✅ Eager loading с joinedload ✅ Правильные индексы ✅ EXPLAIN для анализа

Выбор подхода зависит от сложности запроса и требований производительности.