← Назад к вопросам
Через что подключаешь к PostgreSQL
2.8 Senior🔥 241 комментариев
#DevOps и инфраструктура#Django
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Подключение к PostgreSQL из Python
Основные подходы
Я использую несколько библиотек в зависимости от контекста:
1. SQLAlchemy (ORM) — основной выбор
Для большинства приложений я использую SQLAlchemy с асинхронностью:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from typing import AsyncGenerator
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
# Асинхронное подключение (рекомендуется для FastAPI)
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/mydb",
echo=False, # Set to True для debug
pool_size=20,
max_overflow=10,
)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
# Использование
async def get_user(session: AsyncSession, user_id: int):
return await session.get(User, user_id)
async def create_user(session: AsyncSession, name: str, email: str):
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
Преимущества SQLAlchemy:
- Типизация (Pydantic интеграция)
- ORM для сложных запросов
- Миграции (Alembic или Goose)
- Relationships с lazy loading
- Connection pooling встроенный
2. asyncpg — низкоуровневый драйвер
Для высокопроизводительных систем или когда нужен полный контроль:
import asyncpg
from typing import List, Optional
class UserRepository:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def get_user(self, user_id: int) -> Optional[dict]:
# Прямой SQL с параметризацией
row = await self.pool.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
return dict(row) if row else None
async def get_all_users(self) -> List[dict]:
rows = await self.pool.fetch("SELECT id, name, email FROM users")
return [dict(row) for row in rows]
async def create_user(self, name: str, email: str) -> dict:
row = await self.pool.fetchrow(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email",
name, email
)
return dict(row)
# Инициализация пула
async def get_pool() -> asyncpg.Pool:
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost:5432/mydb",
min_size=10,
max_size=20,
)
return pool
# Использование в FastAPI
from fastapi import FastAPI, Depends
app = FastAPI()
user_repo: Optional[UserRepository] = None
@app.on_event("startup")
async def startup():
global user_repo
pool = await get_pool()
user_repo = UserRepository(pool)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await user_repo.get_user(user_id)
return user
Когда использовать asyncpg:
- Высокая нагрузка (1000+ req/sec)
- Нужен полный контроль над SQL
- Нет сложных relationships
3. psycopg2 / psycopg3 — синхронный драйвер
Для синхронного кода или мигращий:
import psycopg2
from contextlib import contextmanager
class Database:
def __init__(self, connection_string: str):
self.connection_string = connection_string
@contextmanager
def get_connection(self):
conn = psycopg2.connect(self.connection_string)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_user(self, user_id: int):
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cur.fetchone()
def create_user(self, name: str, email: str):
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
(name, email)
)
return cur.fetchone()[0]
# Использование
db = Database("postgresql://user:password@localhost:5432/mydb")
user = db.get_user(1)
Сравнение подходов
| Библиотека | Тип | Асинхронность | Производительность | Уровень |
|---|---|---|---|---|
| SQLAlchemy | ORM | Есть | Хорошая | Высокий |
| asyncpg | Драйвер | Есть | Отличная | Низкий |
| psycopg3 | Драйвер | Есть | Отличная | Низкий |
| psycopg2 | Драйвер | Нет | Хорошая | Низкий |
Лучшие практики подключения
1. Connection pooling
# Никогда так:
for user_id in range(1000):
conn = psycopg2.connect("...") # ПЛОХО! 1000 соединений
user = get_user(conn, user_id)
conn.close()
# Правильно: используй пул
pool = asyncpg.create_pool(min_size=10, max_size=20)
async for user_id in range(1000):
user = await user_repo.get_user(user_id) # Переиспользует соединения
2. Параметризованные запросы (защита от SQL injection)
# НЕПРАВИЛЬНО — SQL injection!
query = f"SELECT * FROM users WHERE id = {user_id}"
# ПРАВИЛЬНО — параметризованный запрос
query = "SELECT * FROM users WHERE id = $1"
result = await pool.fetchrow(query, user_id)
3. Обработка ошибок
import asyncpg
from sqlalchemy.exc import IntegrityError
async def create_user(session: AsyncSession, email: str):
try:
user = User(email=email)
session.add(user)
await session.commit()
return user
except IntegrityError:
# Уникальное нарушение — email уже существует
await session.rollback()
raise ValueError(f"User with email {email} already exists")
except Exception as e:
await session.rollback()
raise
4. Миграции
Для управления схемой БД я использую Alembic с SQLAlchemy или Goose для raw SQL:
# Alembic (для SQLAlchemy)
alembic init migrations
alembic revision --autogenerate -m "Add users table"
alembic upgrade head
# Goose (для raw SQL)
goose create add_users_table sql
goose up
Мой выбор в разных ситуациях
FastAPI + PostgreSQL:
# Выбор: SQLAlchemy Async ORM
engine = create_async_engine("postgresql+asyncpg://...")
# Причина: типизация, удобство, встроенный пулинг
High-load система (>1000 req/sec):
# Выбор: asyncpg + пользовательский репозиторий
pool = await asyncpg.create_pool(...)
# Причина: максимальная производительность, нет ORM overhead
Скрипты / миграции:
# Выбор: psycopg2 или psycopg3 (синхронные)
conn = psycopg2.connect(...)
# Причина: простота, никакой асинхронности не нужно
Background jobs (Celery):
# Выбор: SQLAlchemy Sync ORM
engine = create_engine("postgresql://...")
# Причина: Celery workers синхронные, ORM удобнее
Конфигурация для production
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str
DB_POOL_SIZE: int = 20
DB_MAX_OVERFLOW: int = 10
DB_ECHO: bool = False
class Config:
env_file = ".env"
settings = Settings()
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DB_ECHO,
pool_size=settings.DB_POOL_SIZE,
max_overflow=settings.DB_MAX_OVERFLOW,
pool_pre_ping=True, # Проверяет соединение перед использованием
pool_recycle=3600, # Пересоздаёт соединения каждый час
)
Итог
Мой стандартный стек:
- ORM: SQLAlchemy Async (с Alembic для миграций)
- Драйвер: asyncpg (встроенный в SQLAlchemy)
- Пулинг: встроенный в engine
- Миграции: Goose или Alembic в зависимости от проекта
- Мониторинг: loguru + structured logging для SQL
Это даёт баланс между удобством разработки и производительностью.