← Назад к вопросам

Как мигрировать большие объемы данных в БД?

2.0 Middle🔥 161 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Миграция больших объемов данных в БД

Миграция больших объемов данных — критическая задача, требующая продуманного подхода к производительности, надежности и минимизации downtime.

Стратегии миграции

1. Батч-обработка с контролем памяти

from sqlalchemy import select, insert
from sqlalchemy.orm import Session
import psycopg2
from typing import Iterator, List

def batch_migrate(source_db, target_db, table_name: str, batch_size: int = 10000):
    """Миграция с обработкой батчами"""
    source_session = Session(source_db)
    target_session = Session(target_db)
    
    # Получаем данные батчами
    offset = 0
    processed = 0
    
    while True:
        # Получить batch из исходной БД
        rows = source_session.execute(
            select(SourceTable)
            .offset(offset)
            .limit(batch_size)
        ).scalars().all()
        
        if not rows:
            break
        
        # Трансформировать и вставить в целевую БД
        target_rows = []
        for row in rows:
            target_rows.append({
                'id': row.id,
                'name': row.name,
                'created_at': row.created_at,
            })
        
        # Массовая вставка
        target_session.execute(
            insert(TargetTable),
            target_rows
        )
        target_session.commit()
        
        processed += len(rows)
        offset += batch_size
        
        print(f"Processed: {processed} rows")
        
        if len(rows) < batch_size:
            break
    
    source_session.close()
    target_session.close()

# Использование
batch_migrate(source_engine, target_engine, SourceTable, batch_size=10000)

2. Параллельная обработка с ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine, select, insert
from sqlalchemy.orm import Session
import threading

class ParallelMigrator:
    def __init__(self, source_url: str, target_url: str, num_workers: int = 4):
        self.source_engine = create_engine(source_url)
        self.target_engine = create_engine(target_url)
        self.num_workers = num_workers
        self.lock = threading.Lock()
        self.processed_count = 0
    
    def get_total_count(self) -> int:
        """Получить общее количество записей"""
        session = Session(self.source_engine)
        count = session.query(SourceTable).count()
        session.close()
        return count
    
    def migrate_batch(self, offset: int, batch_size: int):
        """Мигрировать один батч"""
        source_session = Session(self.source_engine)
        target_session = Session(self.target_engine)
        
        try:
            # Получить batch
            rows = source_session.execute(
                select(SourceTable)
                .offset(offset)
                .limit(batch_size)
            ).scalars().all()
            
            if rows:
                # Вставить batch
                target_data = [
                    {'id': row.id, 'name': row.name, 'value': row.value}
                    for row in rows
                ]
                target_session.execute(insert(TargetTable), target_data)
                target_session.commit()
                
                # Атомарное обновление счётчика
                with self.lock:
                    self.processed_count += len(rows)
                    print(f"Total processed: {self.processed_count}")
        
        except Exception as e:
            print(f"Error in batch {offset}: {e}")
            target_session.rollback()
        
        finally:
            source_session.close()
            target_session.close()
    
    def migrate(self, batch_size: int = 5000):
        """Паралельная миграция"""
        total = self.get_total_count()
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = []
            for offset in range(0, total, batch_size):
                future = executor.submit(self.migrate_batch, offset, batch_size)
                futures.append(future)
            
            # Дождаться завершения
            for future in futures:
                future.result()
        
        print(f"Migration complete: {self.processed_count} rows")

# Использование
migrator = ParallelMigrator(SOURCE_DB_URL, TARGET_DB_URL, num_workers=4)
migrator.migrate(batch_size=5000)

3. Streaming с минимальной памятью

from sqlalchemy.sql import text
import psycopg2
from psycopg2 import extras

def stream_migrate(source_url: str, target_url: str, query: str):
    """Потоковая миграция с server-side курсором"""
    # Подключение к исходной БД
    source_conn = psycopg2.connect(source_url)
    target_conn = psycopg2.connect(target_url)
    
    source_cursor = source_conn.cursor('migration_cursor')
    target_cursor = target_conn.cursor()
    
    try:
        # Server-side cursor читает данные порциями
        source_cursor.itersize = 10000
        source_cursor.execute(query)
        
        batch = []
        batch_size = 1000
        
        for row in source_cursor:
            batch.append(row)
            
            if len(batch) >= batch_size:
                # Вставить батч
                extras.execute_batch(
                    target_cursor,
                    'INSERT INTO target_table VALUES (%s, %s, %s)',
                    batch
                )
                target_conn.commit()
                print(f"Inserted: {batch_size} rows")
                batch = []
        
        # Вставить остаток
        if batch:
            extras.execute_batch(
                target_cursor,
                'INSERT INTO target_table VALUES (%s, %s, %s)',
                batch
            )
            target_conn.commit()
    
    finally:
        source_cursor.close()
        target_cursor.close()
        source_conn.close()
        target_conn.close()

# Использование
stream_migrate(
    'postgresql://user:pass@source/db',
    'postgresql://user:pass@target/db',
    'SELECT id, name, value FROM large_table'
)

4. Прямая копия данных (самый быстрый способ)

import psycopg2
from io import StringIO

def fast_copy_migrate(source_url: str, target_url: str, table_name: str):
    """Использование COPY для максимальной скорости"""
    source_conn = psycopg2.connect(source_url)
    target_conn = psycopg2.connect(target_url)
    
    source_cursor = source_conn.cursor()
    target_cursor = target_conn.cursor()
    
    try:
        # Экспорт в CSV
        csv_buffer = StringIO()
        source_cursor.copy_to(
            csv_buffer,
            table_name,
            sep='|'
        )
        
        csv_buffer.seek(0)
        
        # Импорт из CSV
        target_cursor.copy_from(
            csv_buffer,
            table_name,
            sep='|'
        )
        
        target_conn.commit()
        print(f"Fast migration of {table_name} completed")
    
    finally:
        source_cursor.close()
        target_cursor.close()
        source_conn.close()
        target_conn.close()

# Использование
fast_copy_migrate(
    'postgresql://user:pass@source/db',
    'postgresql://user:pass@target/db',
    'users'
)

5. Миграция с трансформацией и валидацией

from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import hashlib

@dataclass
class MigrationRow:
    source_id: int
    name: str
    email: str
    created_at: datetime
    
    def validate(self) -> bool:
        """Валидировать данные"""
        if not self.name or len(self.name) > 255:
            return False
        if '@' not in self.email:
            return False
        return True
    
    def transform(self) -> dict:
        """Трансформировать данные"""
        return {
            'id': self.source_id,
            'name': self.name.strip(),
            'email': self.email.lower(),
            'email_hash': hashlib.md5(self.email.encode()).hexdigest(),
            'migrated_at': datetime.utcnow(),
        }

class ValidatingMigrator:
    def __init__(self, source_db, target_db, batch_size=5000):
        self.source_db = source_db
        self.target_db = target_db
        self.batch_size = batch_size
        self.invalid_count = 0
        self.migrated_count = 0
    
    def migrate(self):
        source_session = Session(self.source_db)
        target_session = Session(self.target_db)
        
        offset = 0
        
        while True:
            rows = source_session.execute(
                select(SourceUser).offset(offset).limit(self.batch_size)
            ).scalars().all()
            
            if not rows:
                break
            
            batch = []
            for row in rows:
                migration_row = MigrationRow(
                    source_id=row.id,
                    name=row.name,
                    email=row.email,
                    created_at=row.created_at
                )
                
                if migration_row.validate():
                    batch.append(migration_row.transform())
                else:
                    self.invalid_count += 1
                    print(f"Invalid row: {row.id}")
            
            if batch:
                target_session.execute(insert(TargetUser), batch)
                target_session.commit()
                self.migrated_count += len(batch)
            
            offset += self.batch_size
            print(f"Migrated: {self.migrated_count}, Invalid: {self.invalid_count}")
        
        source_session.close()
        target_session.close()

# Использование
migrator = ValidatingMigrator(source_engine, target_engine)
migrator.migrate()

Проверка целостности данных

def verify_migration(source_db, target_db, table_name: str):
    """Проверить целостность миграции"""
    source_session = Session(source_db)
    target_session = Session(target_db)
    
    # Проверить количество
    source_count = source_session.query(func.count()).all()[0][0]
    target_count = target_session.query(func.count()).all()[0][0]
    
    print(f"Source: {source_count}, Target: {target_count}")
    assert source_count == target_count, "Count mismatch!"
    
    # Проверить hash данных
    source_hash = source_session.execute(
        text(f"SELECT md5(STRING_AGG(id::text, ',')) FROM {table_name}")
    ).scalar()
    
    target_hash = target_session.execute(
        text(f"SELECT md5(STRING_AGG(id::text, ',')) FROM {table_name}")
    ).scalar()
    
    print(f"Source hash: {source_hash}")
    print(f"Target hash: {target_hash}")
    assert source_hash == target_hash, "Data hash mismatch!"
    
    print("Migration verification passed!")

verify_migration(source_engine, target_engine, 'users')

Лучшие практики

  • Батчируй данные (10K-100K строк за раз)
  • Используй индексы после миграции, а не перед
  • Отключай constraints перед миграцией, включай после
  • Логируй прогресс и ошибки
  • Делай резервную копию перед миграцией
  • Тестируй на prod-подобных данных
  • Минимизируй downtime с zero-downtime миграцией
  • Мониторь память при обработке больших батчей
  • Используй транзакции для гарантии целостности
  • Проверяй данные после миграции

Правильная миграция данных требует тщательного планирования и тестирования.

Как мигрировать большие объемы данных в БД? | PrepBro