← Назад к вопросам
Как мигрировать большие объемы данных в БД?
2.0 Middle🔥 161 комментариев
#Python Core
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Миграция больших объемов данных в БД
Миграция больших объемов данных — критическая задача, требующая продуманного подхода к производительности, надежности и минимизации downtime.
Стратегии миграции
1. Батч-обработка с контролем памяти
from sqlalchemy import select, insert
from sqlalchemy.orm import Session
import psycopg2
from typing import Iterator, List
def batch_migrate(source_db, target_db, table_name: str, batch_size: int = 10000):
"""Миграция с обработкой батчами"""
source_session = Session(source_db)
target_session = Session(target_db)
# Получаем данные батчами
offset = 0
processed = 0
while True:
# Получить batch из исходной БД
rows = source_session.execute(
select(SourceTable)
.offset(offset)
.limit(batch_size)
).scalars().all()
if not rows:
break
# Трансформировать и вставить в целевую БД
target_rows = []
for row in rows:
target_rows.append({
'id': row.id,
'name': row.name,
'created_at': row.created_at,
})
# Массовая вставка
target_session.execute(
insert(TargetTable),
target_rows
)
target_session.commit()
processed += len(rows)
offset += batch_size
print(f"Processed: {processed} rows")
if len(rows) < batch_size:
break
source_session.close()
target_session.close()
# Использование
batch_migrate(source_engine, target_engine, SourceTable, batch_size=10000)
2. Параллельная обработка с ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine, select, insert
from sqlalchemy.orm import Session
import threading
class ParallelMigrator:
def __init__(self, source_url: str, target_url: str, num_workers: int = 4):
self.source_engine = create_engine(source_url)
self.target_engine = create_engine(target_url)
self.num_workers = num_workers
self.lock = threading.Lock()
self.processed_count = 0
def get_total_count(self) -> int:
"""Получить общее количество записей"""
session = Session(self.source_engine)
count = session.query(SourceTable).count()
session.close()
return count
def migrate_batch(self, offset: int, batch_size: int):
"""Мигрировать один батч"""
source_session = Session(self.source_engine)
target_session = Session(self.target_engine)
try:
# Получить batch
rows = source_session.execute(
select(SourceTable)
.offset(offset)
.limit(batch_size)
).scalars().all()
if rows:
# Вставить batch
target_data = [
{'id': row.id, 'name': row.name, 'value': row.value}
for row in rows
]
target_session.execute(insert(TargetTable), target_data)
target_session.commit()
# Атомарное обновление счётчика
with self.lock:
self.processed_count += len(rows)
print(f"Total processed: {self.processed_count}")
except Exception as e:
print(f"Error in batch {offset}: {e}")
target_session.rollback()
finally:
source_session.close()
target_session.close()
def migrate(self, batch_size: int = 5000):
"""Паралельная миграция"""
total = self.get_total_count()
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = []
for offset in range(0, total, batch_size):
future = executor.submit(self.migrate_batch, offset, batch_size)
futures.append(future)
# Дождаться завершения
for future in futures:
future.result()
print(f"Migration complete: {self.processed_count} rows")
# Использование
migrator = ParallelMigrator(SOURCE_DB_URL, TARGET_DB_URL, num_workers=4)
migrator.migrate(batch_size=5000)
3. Streaming с минимальной памятью
from sqlalchemy.sql import text
import psycopg2
from psycopg2 import extras
def stream_migrate(source_url: str, target_url: str, query: str):
"""Потоковая миграция с server-side курсором"""
# Подключение к исходной БД
source_conn = psycopg2.connect(source_url)
target_conn = psycopg2.connect(target_url)
source_cursor = source_conn.cursor('migration_cursor')
target_cursor = target_conn.cursor()
try:
# Server-side cursor читает данные порциями
source_cursor.itersize = 10000
source_cursor.execute(query)
batch = []
batch_size = 1000
for row in source_cursor:
batch.append(row)
if len(batch) >= batch_size:
# Вставить батч
extras.execute_batch(
target_cursor,
'INSERT INTO target_table VALUES (%s, %s, %s)',
batch
)
target_conn.commit()
print(f"Inserted: {batch_size} rows")
batch = []
# Вставить остаток
if batch:
extras.execute_batch(
target_cursor,
'INSERT INTO target_table VALUES (%s, %s, %s)',
batch
)
target_conn.commit()
finally:
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
# Использование
stream_migrate(
'postgresql://user:pass@source/db',
'postgresql://user:pass@target/db',
'SELECT id, name, value FROM large_table'
)
4. Прямая копия данных (самый быстрый способ)
import psycopg2
from io import StringIO
def fast_copy_migrate(source_url: str, target_url: str, table_name: str):
"""Использование COPY для максимальной скорости"""
source_conn = psycopg2.connect(source_url)
target_conn = psycopg2.connect(target_url)
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
try:
# Экспорт в CSV
csv_buffer = StringIO()
source_cursor.copy_to(
csv_buffer,
table_name,
sep='|'
)
csv_buffer.seek(0)
# Импорт из CSV
target_cursor.copy_from(
csv_buffer,
table_name,
sep='|'
)
target_conn.commit()
print(f"Fast migration of {table_name} completed")
finally:
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
# Использование
fast_copy_migrate(
'postgresql://user:pass@source/db',
'postgresql://user:pass@target/db',
'users'
)
5. Миграция с трансформацией и валидацией
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import hashlib
@dataclass
class MigrationRow:
source_id: int
name: str
email: str
created_at: datetime
def validate(self) -> bool:
"""Валидировать данные"""
if not self.name or len(self.name) > 255:
return False
if '@' not in self.email:
return False
return True
def transform(self) -> dict:
"""Трансформировать данные"""
return {
'id': self.source_id,
'name': self.name.strip(),
'email': self.email.lower(),
'email_hash': hashlib.md5(self.email.encode()).hexdigest(),
'migrated_at': datetime.utcnow(),
}
class ValidatingMigrator:
def __init__(self, source_db, target_db, batch_size=5000):
self.source_db = source_db
self.target_db = target_db
self.batch_size = batch_size
self.invalid_count = 0
self.migrated_count = 0
def migrate(self):
source_session = Session(self.source_db)
target_session = Session(self.target_db)
offset = 0
while True:
rows = source_session.execute(
select(SourceUser).offset(offset).limit(self.batch_size)
).scalars().all()
if not rows:
break
batch = []
for row in rows:
migration_row = MigrationRow(
source_id=row.id,
name=row.name,
email=row.email,
created_at=row.created_at
)
if migration_row.validate():
batch.append(migration_row.transform())
else:
self.invalid_count += 1
print(f"Invalid row: {row.id}")
if batch:
target_session.execute(insert(TargetUser), batch)
target_session.commit()
self.migrated_count += len(batch)
offset += self.batch_size
print(f"Migrated: {self.migrated_count}, Invalid: {self.invalid_count}")
source_session.close()
target_session.close()
# Использование
migrator = ValidatingMigrator(source_engine, target_engine)
migrator.migrate()
Проверка целостности данных
def verify_migration(source_db, target_db, table_name: str):
"""Проверить целостность миграции"""
source_session = Session(source_db)
target_session = Session(target_db)
# Проверить количество
source_count = source_session.query(func.count()).all()[0][0]
target_count = target_session.query(func.count()).all()[0][0]
print(f"Source: {source_count}, Target: {target_count}")
assert source_count == target_count, "Count mismatch!"
# Проверить hash данных
source_hash = source_session.execute(
text(f"SELECT md5(STRING_AGG(id::text, ',')) FROM {table_name}")
).scalar()
target_hash = target_session.execute(
text(f"SELECT md5(STRING_AGG(id::text, ',')) FROM {table_name}")
).scalar()
print(f"Source hash: {source_hash}")
print(f"Target hash: {target_hash}")
assert source_hash == target_hash, "Data hash mismatch!"
print("Migration verification passed!")
verify_migration(source_engine, target_engine, 'users')
Лучшие практики
- Батчируй данные (10K-100K строк за раз)
- Используй индексы после миграции, а не перед
- Отключай constraints перед миграцией, включай после
- Логируй прогресс и ошибки
- Делай резервную копию перед миграцией
- Тестируй на prod-подобных данных
- Минимизируй downtime с zero-downtime миграцией
- Мониторь память при обработке больших батчей
- Используй транзакции для гарантии целостности
- Проверяй данные после миграции
Правильная миграция данных требует тщательного планирования и тестирования.