Что такое asyncpg в Python?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
asyncpg в Python
Что это такое?
asyncpg — это асинхронный драйвер для PostgreSQL, написанный на Python. Он предоставляет высокопроизводительное и полностью асинхронное соединение с PostgreSQL, позволяя эффективно работать с базой данных в асинхронных приложениях (async/await).
Отличие от традиционных драйверов (psycopg2) в том, что asyncpg полностью асинхронен и не требует использования потоков или других механизмов синхронизации для параллельной работы с несколькими запросами.
Основные характеристики
Преимущества:
- Высокая производительность — одна из самых быстрых библиотек для PostgreSQL
- Асинхронность — полная поддержка async/await
- Неблокирующие операции — позволяет обрабатывать множество соединений одновременно
- Пулы соединений — встроенная поддержка пулинга
- Типизация — хорошо типизирована для mypy
Установка и базовое использование
import asyncpg
import asyncio
async def main():
# Создание соединения с PostgreSQL
connection = await asyncpg.connect(
user="postgres",
password="secret",
database="mydb",
host="localhost",
port=5432
)
# Выполнение запроса
result = await connection.fetch("SELECT * FROM users LIMIT 10")
print(result)
# Закрытие соединения
await connection.close()
# Запуск
asyncio.run(main())
Пулы соединений
Для обработки множества запросов используются пулы соединений:
import asyncpg
import asyncio
async def main():
# Создание пула соединений
pool = await asyncpg.create_pool(
user="postgres",
password="secret",
database="mydb",
host="localhost",
min_size=10, # Минимальное количество соединений
max_size=20 # Максимальное количество соединений
)
async with pool.acquire() as connection:
result = await connection.fetch("SELECT * FROM users")
print(result)
await pool.close()
asyncio.run(main())
Различные типы запросов
fetch() — получить все результаты
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users WHERE age > $1", 18)
for row in rows:
print(row)
fetchval() — получить одно значение
async with pool.acquire() as conn:
count = await conn.fetchval("SELECT COUNT(*) FROM users")
print(f"Всего пользователей: {count}")
fetchrow() — получить первую строку
async with pool.acquire() as conn:
user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 42)
if user:
print(f"Найден пользователь: {user}")
execute() — выполнить команду (INSERT, UPDATE, DELETE)
async with pool.acquire() as conn:
result = await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"John Doe",
"john@example.com"
)
print(f"Вставлено строк: {result}")
Транзакции
async with pool.acquire() as conn:
# Начало транзакции
async with conn.transaction():
# Несколько операций в одной транзакции
await conn.execute("INSERT INTO accounts (name) VALUES ($1)", "Alice")
await conn.execute("INSERT INTO accounts (name) VALUES ($1)", "Bob")
# Если возникнет ошибка, будет откат (rollback)
Подготовленные запросы
Для повторного использования одного и того же запроса:
async with pool.acquire() as conn:
# Подготовка запроса
stmt = await conn.prepare("SELECT * FROM users WHERE id = $1")
# Выполнение несколько раз
user1 = await stmt.fetchrow(1)
user2 = await stmt.fetchrow(2)
user3 = await stmt.fetchrow(3)
Работа с типами данных
asyncpg автоматически преобразует типы PostgreSQL в Python типы:
async with pool.acquire() as conn:
result = await conn.fetchrow(
"SELECT id, name, created_at, metadata FROM users LIMIT 1"
)
print(type(result["id"])) # <class int>
print(type(result["name"])) # <class str>
print(type(result["created_at"])) # <class datetime.datetime>
print(type(result["metadata"])) # <class dict>
Обработка ошибок
import asyncpg
import asyncio
async def insert_user(pool, name, email):
try:
async with pool.acquire() as conn:
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
name,
email
)
print(f"Пользователь {name} добавлен")
except asyncpg.UniqueViolationError:
print(f"Пользователь с email {email} уже существует")
except asyncpg.PostgresError as e:
print(f"Ошибка базы данных: {e}")
Примеры ошибок
UniqueViolationError— нарушение уникальностиForeignKeyViolationError— нарушение внешнего ключаNotNullViolationError— попытка вставить NULL в не-nullable полеPostgresError— базовый класс для всех ошибок
Сравнение с psycopg2
| Критерий | asyncpg | psycopg2 |
|---|---|---|
| Асинхронность | Полная | Синхронный |
| Производительность | Очень высокая | Хорошая |
| Использование памяти | Низкое | Среднее |
| Кривая обучения | Средняя | Низкая |
| Экосистема | Меньше интеграций | Очень много интеграций |
Практический пример: FastAPI + asyncpg
from fastapi import FastAPI
import asyncpg
app = FastAPI()
pool = None
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
user="postgres",
password="secret",
database="mydb",
host="localhost",
min_size=5,
max_size=10
)
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users")
async def get_users():
async with pool.acquire() as conn:
users = await conn.fetch("SELECT id, name, email FROM users")
return {"users": users}
@app.post("/users")
async def create_user(name: str, email: str):
async with pool.acquire() as conn:
user_id = await conn.fetchval(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
name,
email
)
return {"id": user_id, "name": name, "email": email}
Лучшие практики
- Используйте пулы соединений — всегда, не единичные соединения
- Параметризуйте запросы — избегайте SQL-инъекций ($1, $2 вместо format)
- Обрабатывайте ошибки — ловите специфичные исключения
- Используйте транзакции — для групповых операций
- Закрывайте ресурсы — используйте async with для управления жизненным циклом
asyncpg — это отличный выбор для высоконагруженных асинхронных приложений на Python, требующих быстрого доступа к PostgreSQL.