← Назад к вопросам

Как происходит миграция данных пользователя?

1.7 Middle🔥 141 комментариев
#Python Core#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Миграция данных пользователя

Миграция данных — это процесс передачи информации пользователя из одной системы в другую, между версиями приложения или при масштабировании инфраструктуры. Это критически важный процесс, требующий особого внимания к целостности данных.

Типы миграций

1. Горизонтальная миграция (между системами)

Передача данных из одного сервиса в другой, например:

  • Переезд с одной БД на другую
  • Миграция между облачными провайдерами
  • Объединение двух приложений

2. Вертикальная миграция (версионирование)

Обновление структуры данных при новых версиях:

  • Добавление новых полей
  • Изменение типов данных
  • Реструктуризация

Стратегии миграции

Big Bang (одномоментная)

# Все данные переносятся сразу, сервис полностью выключается
import psycopg2
from datetime import datetime

def big_bang_migration():
    source_conn = psycopg2.connect("dbname=old_db")
    target_conn = psycopg2.connect("dbname=new_db")
    
    source_cursor = source_conn.cursor()
    target_cursor = target_conn.cursor()
    
    # Копируем все данные
    source_cursor.execute("SELECT * FROM users")
    users = source_cursor.fetchall()
    
    for user in users:
        target_cursor.execute(
            "INSERT INTO users (id, email, created_at) VALUES (%s, %s, %s)",
            (user[0], user[1], user[2])
        )
    
    target_conn.commit()
    source_cursor.close()
    target_cursor.close()

Плюсы: Простая, быстрая Минусы: Downtime, высокий риск потери данных

Parallel Run (параллельный запуск)

Оба система работают одновременно:

class DualWriteUserService:
    def __init__(self, old_db, new_db):
        self.old_db = old_db
        self.new_db = new_db
    
    async def create_user(self, user_data):
        # Пишем в обе БД
        old_result = await self.old_db.users.insert_one(user_data)
        new_result = await self.new_db.users.insert_one(user_data)
        
        # Проверяем согласованность
        if old_result.inserted_id != new_result.inserted_id:
            logger.error("Mismatch in dual write!")
            raise MigrationError("Data inconsistency detected")
        
        return new_result
    
    async def get_user(self, user_id):
        # Читаем из новой БД, падаем на старую если ошибка
        try:
            return await self.new_db.users.find_one({"_id": user_id})
        except Exception:
            logger.warning(f"Read from new DB failed, using old DB")
            return await self.old_db.users.find_one({"_id": user_id})

Плюсы: No downtime, можно откатиться Минусы: Сложная, долгая, требует синхронизации

Gradual Migration (постепенная)

import hashlib
from typing import Any

class MigrationRouter:
    def __init__(self, old_db, new_db, migration_percent=0):
        self.old_db = old_db
        self.new_db = new_db
        self.migration_percent = migration_percent  # 0-100
    
    def should_use_new_db(self, user_id: str) -> bool:
        # Детерминированное распределение: одни юзеры на старой, другие на новой
        hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
        return (hash_val % 100) < self.migration_percent
    
    async def get_user(self, user_id: str):
        if self.should_use_new_db(user_id):
            return await self.new_db.users.find_one({"_id": user_id})
        return await self.old_db.users.find_one({"_id": user_id})
    
    async def create_user(self, user_data):
        # Пишем в обе БД, но читаем из выбранной
        await self.old_db.users.insert_one(user_data)
        await self.new_db.users.insert_one(user_data)

Плюсы: Контролируемое распределение, низкий риск Минусы: Длительный процесс

Проверка целостности данных

import asyncio
from datetime import datetime

class DataValidation:
    async def validate_migration(self, old_db, new_db):
        errors = []
        
        # 1. Проверка количества записей
        old_count = await old_db.users.count_documents({})
        new_count = await new_db.users.count_documents({})
        
        if old_count != new_count:
            errors.append(f"Record count mismatch: {old_count} vs {new_count}")
        
        # 2. Проверка целостности на выборке
        sample_size = min(1000, old_count)
        old_sample = await old_db.users.find().limit(sample_size).to_list(sample_size)
        
        for old_user in old_sample:
            new_user = await new_db.users.find_one({"_id": old_user["_id"]})
            
            if not new_user:
                errors.append(f"User {old_user['_id']} missing in new DB")
            elif old_user["email"] != new_user["email"]:
                errors.append(f"Data mismatch for user {old_user['_id']}")
        
        # 3. Проверка новых данных
        new_only = await new_db.users.find(
            {"created_at": {"$gt": datetime.fromisoformat("2024-03-01")}}
        ).to_list(None)
        
        if new_only:
            logger.warning(f"Found {len(new_only)} new records created after migration")
        
        return {
            "valid": len(errors) == 0,
            "errors": errors,
            "timestamp": datetime.utcnow().isoformat()
        }

Откат данных

class MigrationRollback:
    async def rollback(self, checkpoint_id: str):
        """
        Откат к последней контрольной точке
        """
        checkpoint = await self.db.checkpoints.find_one({"_id": checkpoint_id})
        
        if not checkpoint:
            raise ValueError("Checkpoint not found")
        
        # Восстанавливаем из резервной копии
        backup_path = checkpoint["backup_path"]
        await self.restore_from_backup(backup_path)
        
        logger.info(f"Rollback completed to {checkpoint_id}")

Лучшие практики

  • Всегда делай резервную копию перед миграцией
  • Тестируй на staging перед production
  • Проверяй целостность данных после каждого этапа
  • Документируй процесс для возможности отката
  • Используй контрольные точки для паузы между этапами
  • Мониторь метрики во время миграции (CPU, память, lag)
  • Сообщи пользователям о плановом техническом обслуживании
  • Имей план отката на случай проблем

Миграция данных — это сложный процесс, требующий тщательного планирования, тестирования и мониторинга на каждом этапе.