← Назад к вопросам
Какие знаешь способы загрузки данных в SQL?
1.7 Middle🔥 121 комментариев
#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Способы загрузки данных в SQL
В своей практике я работал с разными подходами загрузки данных. Расскажу о самых эффективных.
1. INSERT с параметризованными запросами
Базовый способ — безопасен от SQL-injection:
import psycopg2
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
# Одиночная вставка
query = "INSERT INTO users (name, email) VALUES (%s, %s)"
cursor.execute(query, ("John", "john@mail.com"))
conn.commit()
# Множественная вставка
users = [("Alice", "alice@mail.com"), ("Bob", "bob@mail.com")]
for name, email in users:
cursor.execute(query, (name, email))
conn.commit()
2. executemany() — пакетная вставка
В 5-10 раз быстрее отдельных INSERT:
import sqlite3
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
users = [
("Alice", "alice@mail.com"),
("Bob", "bob@mail.com"),
("Charlie", "charlie@mail.com"),
]
cursor.executemany(
"INSERT INTO users (name, email) VALUES (?, ?)",
users
)
conn.commit()
3. COPY / COPY FROM — для больших объёмов
В 100+ раз быстрее обычных INSERT, используется для миллионов строк:
import psycopg2
import io
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
# Подготовка данных в памяти
data = io.StringIO()
for user_id, name, email in users:
data.write(f"{user_id}\t{name}\t{email}\n")
data.seek(0)
# COPY FROM STDIN
cursor.copy_from(
data,
"users",
columns=("id", "name", "email")
)
conn.commit()
cursor.close()
conn.close()
Производительность COPY из файла:
# SQL команда
COPY users (id, name, email) FROM /path/to/file.csv WITH (FORMAT csv);
# Или через psql
psql mydb -c "\\COPY users FROM data.csv WITH (FORMAT csv, DELIMITER ,, QUOTE ")"
4. SQLAlchemy bulk_insert_mappings() — ORM подход
Для работы с ORM моделями:
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from models import User
engine = create_engine("postgresql://user:pass@localhost/mydb")
users_data = [
{"name": "Alice", "email": "alice@mail.com"},
{"name": "Bob", "email": "bob@mail.com"},
{"name": "Charlie", "email": "charlie@mail.com"},
]
with Session(engine) as session:
session.bulk_insert_mappings(User, users_data)
session.commit()
5. SQLAlchemy bulk_save_objects() — для объектов
from sqlalchemy.orm import Session
from models import User
users_objects = [User(name=name, email=email) for name, email in user_list]
with Session(engine) as session:
session.bulk_save_objects(users_objects, return_defaults=False)
session.commit()
6. Pandas + to_sql() — для DataFrames
Удобно для работы с CSV, Excel:
import pandas as pd
from sqlalchemy import create_engine
df = pd.read_csv("users.csv")
engine = create_engine("postgresql://user:pass@localhost/mydb")
# Вставка
df.to_sql(
"users",
con=engine,
if_exists="append", # или "replace", "fail"
index=False,
chunksize=1000 # Пакетами
)
7. Asyncio с асинхронными драйверами
Для параллельной загрузки из нескольких источников:
import asyncio
import asyncpg
async def insert_users(user_batch):
conn = await asyncpg.connect("postgresql://user:pass@localhost/mydb")
query = "INSERT INTO users (name, email) VALUES ($1, $2)"
await conn.executemany(query, user_batch)
await conn.close()
async def load_all_users():
batches = [
[("Alice", "alice@mail.com"), ("Bob", "bob@mail.com")],
[("Charlie", "charlie@mail.com"), ("David", "david@mail.com")],
]
await asyncio.gather(*[insert_users(batch) for batch in batches])
asyncio.run(load_all_users())
8. Transactional INSERT с error handling
import psycopg2
from psycopg2 import IntegrityError
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
try:
cursor.execute("BEGIN") # Явное начало транзакции
for user in users:
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
(user["name"], user["email"])
)
cursor.execute("COMMIT") # Коммит всех или ничего
except IntegrityError as e:
cursor.execute("ROLLBACK")
print(f"Duplicate key: {e}")
# Можем пропустить и продолжить
except Exception as e:
cursor.execute("ROLLBACK")
raise
finally:
conn.close()
9. Upsert (INSERT ... ON CONFLICT)
Обновление при конфликте вместо ошибки:
# PostgreSQL
query = """
INSERT INTO users (id, name, email)
VALUES (%s, %s, %s)
ON CONFLICT (email) DO UPDATE
SET name = EXCLUDED.name
"""
cursor.execute(query, (1, "Alice", "alice@mail.com"))
conn.commit()
# SQLite
query = """
INSERT OR REPLACE INTO users (id, name, email)
VALUES (?, ?, ?)
"""
cursor.execute(query, (1, "Alice", "alice@mail.com"))
conn.commit()
10. Миграции для подготовки схемы
Перед загрузкой данных убедиться, что схема готова:
# Goose миграции
goose -dir ./migrations postgres "postgresql://user:pass@localhost/mydb" up
Рекомендации по выбору
| Способ | Объём | Скорость | Удобство |
|---|---|---|---|
| INSERT (одиночный) | < 100 | Медленно | Просто |
| executemany() | 100-10K | Хорошо | Хорошо |
| COPY | 10K+ | Очень быстро | Требует подготовки |
| Pandas to_sql() | 10K+ | Хорошо | Удобно |
| Asyncio | I/O bound | Быстро | Сложнее |
| Bulk ORM | 10K+ | Хорошо | В рамках ORM |
Performance tips
- Отключи индексы перед загрузкой больших объёмов, включи после
- Используй COPY для файлов размером 1M+ строк
- Batch size = 1000-10000 — оптимум
- Используй native drivers (asyncpg) вместо psycopg2 для скорости
- Подготовь данные в памяти, а не читай построчно
- Используй truncate вместо delete для пересоздания таблиц
В production проекте я обычно выбираю: COPY для исторических данных, bulk_insert для ORM, executemany() для обычных случаев.