Как происходит миграция данных пользователя?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Миграция данных пользователя
Миграция данных — это процесс передачи информации пользователя из одной системы в другую, между версиями приложения или при масштабировании инфраструктуры. Это критически важный процесс, требующий особого внимания к целостности данных.
Типы миграций
1. Горизонтальная миграция (между системами)
Передача данных из одного сервиса в другой, например:
- Переезд с одной БД на другую
- Миграция между облачными провайдерами
- Объединение двух приложений
2. Вертикальная миграция (версионирование)
Обновление структуры данных при новых версиях:
- Добавление новых полей
- Изменение типов данных
- Реструктуризация
Стратегии миграции
Big Bang (одномоментная)
# Все данные переносятся сразу, сервис полностью выключается
import psycopg2
from datetime import datetime
def big_bang_migration():
source_conn = psycopg2.connect("dbname=old_db")
target_conn = psycopg2.connect("dbname=new_db")
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# Копируем все данные
source_cursor.execute("SELECT * FROM users")
users = source_cursor.fetchall()
for user in users:
target_cursor.execute(
"INSERT INTO users (id, email, created_at) VALUES (%s, %s, %s)",
(user[0], user[1], user[2])
)
target_conn.commit()
source_cursor.close()
target_cursor.close()
Плюсы: Простая, быстрая Минусы: Downtime, высокий риск потери данных
Parallel Run (параллельный запуск)
Оба система работают одновременно:
class DualWriteUserService:
def __init__(self, old_db, new_db):
self.old_db = old_db
self.new_db = new_db
async def create_user(self, user_data):
# Пишем в обе БД
old_result = await self.old_db.users.insert_one(user_data)
new_result = await self.new_db.users.insert_one(user_data)
# Проверяем согласованность
if old_result.inserted_id != new_result.inserted_id:
logger.error("Mismatch in dual write!")
raise MigrationError("Data inconsistency detected")
return new_result
async def get_user(self, user_id):
# Читаем из новой БД, падаем на старую если ошибка
try:
return await self.new_db.users.find_one({"_id": user_id})
except Exception:
logger.warning(f"Read from new DB failed, using old DB")
return await self.old_db.users.find_one({"_id": user_id})
Плюсы: No downtime, можно откатиться Минусы: Сложная, долгая, требует синхронизации
Gradual Migration (постепенная)
import hashlib
from typing import Any
class MigrationRouter:
def __init__(self, old_db, new_db, migration_percent=0):
self.old_db = old_db
self.new_db = new_db
self.migration_percent = migration_percent # 0-100
def should_use_new_db(self, user_id: str) -> bool:
# Детерминированное распределение: одни юзеры на старой, другие на новой
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return (hash_val % 100) < self.migration_percent
async def get_user(self, user_id: str):
if self.should_use_new_db(user_id):
return await self.new_db.users.find_one({"_id": user_id})
return await self.old_db.users.find_one({"_id": user_id})
async def create_user(self, user_data):
# Пишем в обе БД, но читаем из выбранной
await self.old_db.users.insert_one(user_data)
await self.new_db.users.insert_one(user_data)
Плюсы: Контролируемое распределение, низкий риск Минусы: Длительный процесс
Проверка целостности данных
import asyncio
from datetime import datetime
class DataValidation:
async def validate_migration(self, old_db, new_db):
errors = []
# 1. Проверка количества записей
old_count = await old_db.users.count_documents({})
new_count = await new_db.users.count_documents({})
if old_count != new_count:
errors.append(f"Record count mismatch: {old_count} vs {new_count}")
# 2. Проверка целостности на выборке
sample_size = min(1000, old_count)
old_sample = await old_db.users.find().limit(sample_size).to_list(sample_size)
for old_user in old_sample:
new_user = await new_db.users.find_one({"_id": old_user["_id"]})
if not new_user:
errors.append(f"User {old_user['_id']} missing in new DB")
elif old_user["email"] != new_user["email"]:
errors.append(f"Data mismatch for user {old_user['_id']}")
# 3. Проверка новых данных
new_only = await new_db.users.find(
{"created_at": {"$gt": datetime.fromisoformat("2024-03-01")}}
).to_list(None)
if new_only:
logger.warning(f"Found {len(new_only)} new records created after migration")
return {
"valid": len(errors) == 0,
"errors": errors,
"timestamp": datetime.utcnow().isoformat()
}
Откат данных
class MigrationRollback:
async def rollback(self, checkpoint_id: str):
"""
Откат к последней контрольной точке
"""
checkpoint = await self.db.checkpoints.find_one({"_id": checkpoint_id})
if not checkpoint:
raise ValueError("Checkpoint not found")
# Восстанавливаем из резервной копии
backup_path = checkpoint["backup_path"]
await self.restore_from_backup(backup_path)
logger.info(f"Rollback completed to {checkpoint_id}")
Лучшие практики
- Всегда делай резервную копию перед миграцией
- Тестируй на staging перед production
- Проверяй целостность данных после каждого этапа
- Документируй процесс для возможности отката
- Используй контрольные точки для паузы между этапами
- Мониторь метрики во время миграции (CPU, память, lag)
- Сообщи пользователям о плановом техническом обслуживании
- Имей план отката на случай проблем
Миграция данных — это сложный процесс, требующий тщательного планирования, тестирования и мониторинга на каждом этапе.