← Назад к вопросам

Через что подключаешь к PostgreSQL

2.8 Senior🔥 241 комментариев
#DevOps и инфраструктура#Django

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Подключение к PostgreSQL из Python

Основные подходы

Я использую несколько библиотек в зависимости от контекста:

1. SQLAlchemy (ORM) — основной выбор

Для большинства приложений я использую SQLAlchemy с асинхронностью:

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from typing import AsyncGenerator

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String, unique=True)

# Асинхронное подключение (рекомендуется для FastAPI)
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost:5432/mydb",
    echo=False,  # Set to True для debug
    pool_size=20,
    max_overflow=10,
)

AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        yield session

# Использование
async def get_user(session: AsyncSession, user_id: int):
    return await session.get(User, user_id)

async def create_user(session: AsyncSession, name: str, email: str):
    user = User(name=name, email=email)
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user

Преимущества SQLAlchemy:

  • Типизация (Pydantic интеграция)
  • ORM для сложных запросов
  • Миграции (Alembic или Goose)
  • Relationships с lazy loading
  • Connection pooling встроенный

2. asyncpg — низкоуровневый драйвер

Для высокопроизводительных систем или когда нужен полный контроль:

import asyncpg
from typing import List, Optional

class UserRepository:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool
    
    async def get_user(self, user_id: int) -> Optional[dict]:
        # Прямой SQL с параметризацией
        row = await self.pool.fetchrow(
            "SELECT id, name, email FROM users WHERE id = $1",
            user_id
        )
        return dict(row) if row else None
    
    async def get_all_users(self) -> List[dict]:
        rows = await self.pool.fetch("SELECT id, name, email FROM users")
        return [dict(row) for row in rows]
    
    async def create_user(self, name: str, email: str) -> dict:
        row = await self.pool.fetchrow(
            "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email",
            name, email
        )
        return dict(row)

# Инициализация пула
async def get_pool() -> asyncpg.Pool:
    pool = await asyncpg.create_pool(
        "postgresql://user:password@localhost:5432/mydb",
        min_size=10,
        max_size=20,
    )
    return pool

# Использование в FastAPI
from fastapi import FastAPI, Depends

app = FastAPI()
user_repo: Optional[UserRepository] = None

@app.on_event("startup")
async def startup():
    global user_repo
    pool = await get_pool()
    user_repo = UserRepository(pool)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await user_repo.get_user(user_id)
    return user

Когда использовать asyncpg:

  • Высокая нагрузка (1000+ req/sec)
  • Нужен полный контроль над SQL
  • Нет сложных relationships

3. psycopg2 / psycopg3 — синхронный драйвер

Для синхронного кода или мигращий:

import psycopg2
from contextlib import contextmanager

class Database:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
    
    @contextmanager
    def get_connection(self):
        conn = psycopg2.connect(self.connection_string)
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()
    
    def get_user(self, user_id: int):
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
                return cur.fetchone()
    
    def create_user(self, name: str, email: str):
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
                    (name, email)
                )
                return cur.fetchone()[0]

# Использование
db = Database("postgresql://user:password@localhost:5432/mydb")
user = db.get_user(1)

Сравнение подходов

БиблиотекаТипАсинхронностьПроизводительностьУровень
SQLAlchemyORMЕстьХорошаяВысокий
asyncpgДрайверЕстьОтличнаяНизкий
psycopg3ДрайверЕстьОтличнаяНизкий
psycopg2ДрайверНетХорошаяНизкий

Лучшие практики подключения

1. Connection pooling

# Никогда так:
for user_id in range(1000):
    conn = psycopg2.connect("...")  # ПЛОХО! 1000 соединений
    user = get_user(conn, user_id)
    conn.close()

# Правильно: используй пул
pool = asyncpg.create_pool(min_size=10, max_size=20)
async for user_id in range(1000):
    user = await user_repo.get_user(user_id)  # Переиспользует соединения

2. Параметризованные запросы (защита от SQL injection)

# НЕПРАВИЛЬНО — SQL injection!
query = f"SELECT * FROM users WHERE id = {user_id}"

# ПРАВИЛЬНО — параметризованный запрос
query = "SELECT * FROM users WHERE id = $1"
result = await pool.fetchrow(query, user_id)

3. Обработка ошибок

import asyncpg
from sqlalchemy.exc import IntegrityError

async def create_user(session: AsyncSession, email: str):
    try:
        user = User(email=email)
        session.add(user)
        await session.commit()
        return user
    except IntegrityError:
        # Уникальное нарушение — email уже существует
        await session.rollback()
        raise ValueError(f"User with email {email} already exists")
    except Exception as e:
        await session.rollback()
        raise

4. Миграции

Для управления схемой БД я использую Alembic с SQLAlchemy или Goose для raw SQL:

# Alembic (для SQLAlchemy)
alembic init migrations
alembic revision --autogenerate -m "Add users table"
alembic upgrade head

# Goose (для raw SQL)
goose create add_users_table sql
goose up

Мой выбор в разных ситуациях

FastAPI + PostgreSQL:

# Выбор: SQLAlchemy Async ORM
engine = create_async_engine("postgresql+asyncpg://...")
# Причина: типизация, удобство, встроенный пулинг

High-load система (>1000 req/sec):

# Выбор: asyncpg + пользовательский репозиторий
pool = await asyncpg.create_pool(...)
# Причина: максимальная производительность, нет ORM overhead

Скрипты / миграции:

# Выбор: psycopg2 или psycopg3 (синхронные)
conn = psycopg2.connect(...)
# Причина: простота, никакой асинхронности не нужно

Background jobs (Celery):

# Выбор: SQLAlchemy Sync ORM
engine = create_engine("postgresql://...")
# Причина: Celery workers синхронные, ORM удобнее

Конфигурация для production

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    DATABASE_URL: str
    DB_POOL_SIZE: int = 20
    DB_MAX_OVERFLOW: int = 10
    DB_ECHO: bool = False
    
    class Config:
        env_file = ".env"

settings = Settings()

engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DB_ECHO,
    pool_size=settings.DB_POOL_SIZE,
    max_overflow=settings.DB_MAX_OVERFLOW,
    pool_pre_ping=True,  # Проверяет соединение перед использованием
    pool_recycle=3600,   # Пересоздаёт соединения каждый час
)

Итог

Мой стандартный стек:

  • ORM: SQLAlchemy Async (с Alembic для миграций)
  • Драйвер: asyncpg (встроенный в SQLAlchemy)
  • Пулинг: встроенный в engine
  • Миграции: Goose или Alembic в зависимости от проекта
  • Мониторинг: loguru + structured logging для SQL

Это даёт баланс между удобством разработки и производительностью.

Через что подключаешь к PostgreSQL | PrepBro