← Назад к вопросам

Как организовать обслуживание баз данных?

1.0 Junior🔥 101 комментариев
#Soft Skills

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Организация обслуживания баз данных

Манаджмент БД включает миграции, резервные копии, мониторинг, оптимизацию и операционные процессы.

1. Система миграций (Goose + SQL)

Структура файлов

migrations/
├── 00001_initial_schema.sql
├── 00002_add_users_table.sql
├── 00003_add_orders_table.sql
├── 00004_create_indexes.sql
└── 00005_add_audit_columns.sql

Формат миграции

-- migrations/00002_add_users_table.sql
-- +goose Up
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) NOT NULL UNIQUE,
    first_name VARCHAR(100),
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);

-- +goose Down
DROP INDEX idx_users_created_at;
DROP INDEX idx_users_email;
DROP TABLE users;

Python управление миграциями

# app/database/migrations.py
import subprocess
import os
from typing import Optional

class MigrationManager:
    def __init__(self, database_url: str, migrations_dir: str = 'migrations'):
        self.database_url = database_url
        self.migrations_dir = migrations_dir
    
    def up(self, steps: Optional[int] = None):
        """Применить миграции (вперёд)"""
        cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'up']
        if steps:
            cmd += [str(steps)]
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            raise RuntimeError(f'Migration failed: {result.stderr}')
        print(result.stdout)
    
    def down(self, steps: int = 1):
        """Откатить миграции (назад)"""
        cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'down', str(steps)]
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            raise RuntimeError(f'Rollback failed: {result.stderr}')
        print(result.stdout)
    
    def status(self):
        """Статус миграций"""
        cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'status']
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)
    
    def reset(self):
        """Откатить ВСЕ миграции (только для разработки!)"""
        # ⚠️ ОПАСНО! Только в разработке
        if os.getenv('ENVIRONMENT') != 'development':
            raise RuntimeError('Reset only allowed in development')
        cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'reset']
        result = subprocess.run(cmd, capture_output=True, text=True)
        print(result.stdout)

# Использование
if __name__ == '__main__':
    manager = MigrationManager('postgresql://user:pass@localhost/db')
    
    # Применить все новые миграции
    manager.up()
    
    # Откатить последнюю
    manager.down()
    
    # Статус
    manager.status()

Workflow разработчика

# 1. Создать новую миграцию
goose create add_posts_table sql
# Создаёт: migrations/NNNN_add_posts_table.sql

# 2. Редактировать миграцию
vim migrations/00006_add_posts_table.sql

# 3. Применить миграцию
goose -dir migrations postgres postgresql://user:pass@localhost/db up

# 4. Протестировать откат
goose -dir migrations postgres postgresql://user:pass@localhost/db down

# 5. Повторно применить
goose -dir migrations postgres postgresql://user:pass@localhost/db up

# 6. Комитить в git
git add migrations/00006_add_posts_table.sql
git commit -m "Add posts table"

2. Backup & Recovery

Автоматические бэкапы

# app/database/backup.py
import subprocess
import os
from datetime import datetime
import gzip

class BackupManager:
    def __init__(self, database_url: str, backup_dir: str = 'backups'):
        self.database_url = database_url
        self.backup_dir = backup_dir
        os.makedirs(backup_dir, exist_ok=True)
    
    def create_backup(self) -> str:
        """Создать полный бэкап БД"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = os.path.join(self.backup_dir, f'backup_{timestamp}.sql.gz')
        
        # Экспортировать БД
        dump_cmd = f'pg_dump {self.database_url} | gzip > {backup_file}'
        result = os.system(dump_cmd)
        
        if result != 0:
            raise RuntimeError(f'Backup failed')
        
        size_mb = os.path.getsize(backup_file) / (1024 * 1024)
        print(f'Backup created: {backup_file} ({size_mb:.2f} MB)')
        return backup_file
    
    def restore_backup(self, backup_file: str):
        """Восстановить БД из бэкапа"""
        # ⚠️ ОПАСНО! Удалит текущие данные
        confirm = input(f'Restore from {backup_file}? This will delete current data. Type "yes" to confirm: ')
        if confirm != 'yes':
            print('Cancelled')
            return
        
        # Восстановить
        restore_cmd = f'gunzip -c {backup_file} | psql {self.database_url}'
        result = os.system(restore_cmd)
        
        if result != 0:
            raise RuntimeError('Restore failed')
        
        print(f'Database restored from {backup_file}')
    
    def cleanup_old_backups(self, keep_days: int = 7):
        """Удалить старые бэкапы"""
        from datetime import timedelta
        import os
        
        cutoff_date = datetime.now() - timedelta(days=keep_days)
        
        for filename in os.listdir(self.backup_dir):
            filepath = os.path.join(self.backup_dir, filename)
            if os.path.isfile(filepath):
                file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
                if file_time < cutoff_date:
                    os.remove(filepath)
                    print(f'Deleted old backup: {filename}')

# Использование в cron или scheduler
if __name__ == '__main__':
    manager = BackupManager('postgresql://user:pass@localhost/db')
    
    # Создать бэкап
    backup_file = manager.create_backup()
    
    # Удалить старые
    manager.cleanup_old_backups(keep_days=7)

Cron job для автоматических бэкапов

#!/bin/bash
# backup.sh

DB_URL="postgresql://user:password@localhost/db"
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
FILE="$BACKUP_DIR/backup_$DATE.sql.gz"

# Создать бэкап
pg_dump $DB_URL | gzip > $FILE

# Отправить на S3 (для надёжности)
aws s3 cp $FILE s3://my-backups/

# Удалить локальные бэкапы старше 7 дней
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete

# Отправить алерт
if [ $? -eq 0 ]; then
    echo "Backup successful: $FILE" | mail -s "DB Backup OK" admin@example.com
else
    echo "Backup FAILED!" | mail -s "DB Backup FAILED" admin@example.com
fi
# Добавить в crontab
# Каждый день в 2 AM
0 2 * * * /home/user/backup.sh

3. Мониторинг и производительность

# app/database/monitoring.py
import psycopg2
from typing import Dict, List

class DatabaseMonitoring:
    def __init__(self, connection_string: str):
        self.conn_string = connection_string
    
    def get_slow_queries(self, min_duration_ms: int = 1000) -> List[Dict]:
        """Получить медленные запросы"""
        conn = psycopg2.connect(self.conn_string)
        cur = conn.cursor()
        
        query = """
        SELECT 
            query,
            mean_exec_time,
            calls
        FROM pg_stat_statements
        WHERE mean_exec_time > %s
        ORDER BY mean_exec_time DESC
        LIMIT 10;
        """
        
        cur.execute(query, (min_duration_ms,))
        columns = [desc[0] for desc in cur.description]
        results = [dict(zip(columns, row)) for row in cur.fetchall()]
        
        cur.close()
        conn.close()
        
        return results
    
    def get_table_sizes(self) -> List[Dict]:
        """Размер каждой таблицы"""
        conn = psycopg2.connect(self.conn_string)
        cur = conn.cursor()
        
        query = """
        SELECT 
            tablename,
            pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
        FROM pg_tables
        WHERE schemaname = 'public'
        ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
        """
        
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = [dict(zip(columns, row)) for row in cur.fetchall()]
        
        cur.close()
        conn.close()
        
        return results
    
    def get_missing_indexes(self) -> List[Dict]:
        """Неиспользуемые индексы"""
        conn = psycopg2.connect(self.conn_string)
        cur = conn.cursor()
        
        query = """
        SELECT 
            schemaname,
            tablename,
            indexname,
            idx_scan
        FROM pg_stat_user_indexes
        WHERE idx_scan = 0
        ORDER BY pg_relation_size(indexrelid) DESC;
        """
        
        cur.execute(query)
        columns = [desc[0] for desc in cur.description]
        results = [dict(zip(columns, row)) for row in cur.fetchall()]
        
        cur.close()
        conn.close()
        
        return results

# Использование
if __name__ == '__main__':
    monitor = DatabaseMonitoring('postgresql://user:pass@localhost/db')
    
    print('=== SLOW QUERIES ===')
    for query in monitor.get_slow_queries():
        print(f"{query['query'][:50]}... - {query['mean_exec_time']:.2f}ms")
    
    print('\n=== TABLE SIZES ===')
    for table in monitor.get_table_sizes():
        print(f"{table['tablename']}: {table['size']}")
    
    print('\n=== UNUSED INDEXES ===')
    for idx in monitor.get_missing_indexes():
        print(f"{idx['indexname']} on {idx['tablename']} (0 scans)")

4. Оптимизация индексов

-- migrations/00010_optimize_indexes.sql
-- +goose Up

-- Создать составной индекс для часто используемого фильтра
CREATE INDEX idx_orders_user_status ON orders(user_id, status, created_at DESC)
WHERE status != 'deleted';

-- Partial индекс для активных пользователей
CREATE INDEX idx_active_users ON users(created_at DESC)
WHERE is_active = true;

-- BRIN индекс для больших таблиц с сортировкой по времени
CREATE INDEX idx_events_created_at ON events USING BRIN (created_at);

-- +goose Down
DROP INDEX idx_events_created_at;
DROP INDEX idx_active_users;
DROP INDEX idx_orders_user_status;

5. Масштабирование

Connection Pooling

# app/database/pool.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,           # Бэйс коннекции
    max_overflow=10,        # Дополнительные при пиках
    pool_recycle=3600,      # Переиспользовать каждый час
    pool_pre_ping=True      # Проверить перед использованием
)

Read Replicas

# Если используешь plusieurs БД
read_engine = create_engine('postgresql://user:pass@replica.db/db')
write_engine = create_engine('postgresql://user:pass@master.db/db')

def get_session(read_only=False):
    engine = read_engine if read_only else write_engine
    return sessionmaker(bind=engine)()

# В коде
session = get_session(read_only=True)  # SELECT
write_session = get_session(read_only=False)  # INSERT/UPDATE

6. Стратегия обслуживания

Ежедневно:
  ✅ Создание бэкапов
  ✅ Мониторинг медленных запросов
  ✅ Проверка дискового пространства

Еженедельно:
  ✅ Анализ индексов (ANALYZE)
  ✅ Дефрагментация (VACUUM)
  ✅ Review unused indexes

Ежемесячно:
  ✅ Полный анализ производительности
  ✅ Архивирование старых данных
  ✅ Обновление статистики (ANALYZE VERBOSE)

По необходимости:
  ✅ Добавление новых индексов
  ✅ Рефакторинг запросов
  ✅ Шардирование (если > 100GB)

7. Лучшие практики

  1. Версионируй миграции — всегда в git
  2. Тестируй откаты — убедись, что up/down работают
  3. Автоматические бэкапы — каждый день
  4. Мониторинг — следи за производительностью
  5. Индексирование — только что нужно
  6. Connection pooling — не создавай новые коннекции
  7. Статистика — регулярно ANALYZE
  8. Архивирование — удаляй старые данные

Хорошее обслуживание БД — основа надёжной системы!

Как организовать обслуживание баз данных? | PrepBro