Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Организация обслуживания баз данных
Манаджмент БД включает миграции, резервные копии, мониторинг, оптимизацию и операционные процессы.
1. Система миграций (Goose + SQL)
Структура файлов
migrations/
├── 00001_initial_schema.sql
├── 00002_add_users_table.sql
├── 00003_add_orders_table.sql
├── 00004_create_indexes.sql
└── 00005_add_audit_columns.sql
Формат миграции
-- migrations/00002_add_users_table.sql
-- +goose Up
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL UNIQUE,
first_name VARCHAR(100),
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);
-- +goose Down
DROP INDEX idx_users_created_at;
DROP INDEX idx_users_email;
DROP TABLE users;
Python управление миграциями
# app/database/migrations.py
import subprocess
import os
from typing import Optional
class MigrationManager:
def __init__(self, database_url: str, migrations_dir: str = 'migrations'):
self.database_url = database_url
self.migrations_dir = migrations_dir
def up(self, steps: Optional[int] = None):
"""Применить миграции (вперёд)"""
cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'up']
if steps:
cmd += [str(steps)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f'Migration failed: {result.stderr}')
print(result.stdout)
def down(self, steps: int = 1):
"""Откатить миграции (назад)"""
cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'down', str(steps)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f'Rollback failed: {result.stderr}')
print(result.stdout)
def status(self):
"""Статус миграций"""
cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'status']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
def reset(self):
"""Откатить ВСЕ миграции (только для разработки!)"""
# ⚠️ ОПАСНО! Только в разработке
if os.getenv('ENVIRONMENT') != 'development':
raise RuntimeError('Reset only allowed in development')
cmd = ['goose', '-dir', self.migrations_dir, 'postgres', self.database_url, 'reset']
result = subprocess.run(cmd, capture_output=True, text=True)
print(result.stdout)
# Использование
if __name__ == '__main__':
manager = MigrationManager('postgresql://user:pass@localhost/db')
# Применить все новые миграции
manager.up()
# Откатить последнюю
manager.down()
# Статус
manager.status()
Workflow разработчика
# 1. Создать новую миграцию
goose create add_posts_table sql
# Создаёт: migrations/NNNN_add_posts_table.sql
# 2. Редактировать миграцию
vim migrations/00006_add_posts_table.sql
# 3. Применить миграцию
goose -dir migrations postgres postgresql://user:pass@localhost/db up
# 4. Протестировать откат
goose -dir migrations postgres postgresql://user:pass@localhost/db down
# 5. Повторно применить
goose -dir migrations postgres postgresql://user:pass@localhost/db up
# 6. Комитить в git
git add migrations/00006_add_posts_table.sql
git commit -m "Add posts table"
2. Backup & Recovery
Автоматические бэкапы
# app/database/backup.py
import subprocess
import os
from datetime import datetime
import gzip
class BackupManager:
def __init__(self, database_url: str, backup_dir: str = 'backups'):
self.database_url = database_url
self.backup_dir = backup_dir
os.makedirs(backup_dir, exist_ok=True)
def create_backup(self) -> str:
"""Создать полный бэкап БД"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(self.backup_dir, f'backup_{timestamp}.sql.gz')
# Экспортировать БД
dump_cmd = f'pg_dump {self.database_url} | gzip > {backup_file}'
result = os.system(dump_cmd)
if result != 0:
raise RuntimeError(f'Backup failed')
size_mb = os.path.getsize(backup_file) / (1024 * 1024)
print(f'Backup created: {backup_file} ({size_mb:.2f} MB)')
return backup_file
def restore_backup(self, backup_file: str):
"""Восстановить БД из бэкапа"""
# ⚠️ ОПАСНО! Удалит текущие данные
confirm = input(f'Restore from {backup_file}? This will delete current data. Type "yes" to confirm: ')
if confirm != 'yes':
print('Cancelled')
return
# Восстановить
restore_cmd = f'gunzip -c {backup_file} | psql {self.database_url}'
result = os.system(restore_cmd)
if result != 0:
raise RuntimeError('Restore failed')
print(f'Database restored from {backup_file}')
def cleanup_old_backups(self, keep_days: int = 7):
"""Удалить старые бэкапы"""
from datetime import timedelta
import os
cutoff_date = datetime.now() - timedelta(days=keep_days)
for filename in os.listdir(self.backup_dir):
filepath = os.path.join(self.backup_dir, filename)
if os.path.isfile(filepath):
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
if file_time < cutoff_date:
os.remove(filepath)
print(f'Deleted old backup: {filename}')
# Использование в cron или scheduler
if __name__ == '__main__':
manager = BackupManager('postgresql://user:pass@localhost/db')
# Создать бэкап
backup_file = manager.create_backup()
# Удалить старые
manager.cleanup_old_backups(keep_days=7)
Cron job для автоматических бэкапов
#!/bin/bash
# backup.sh
DB_URL="postgresql://user:password@localhost/db"
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
FILE="$BACKUP_DIR/backup_$DATE.sql.gz"
# Создать бэкап
pg_dump $DB_URL | gzip > $FILE
# Отправить на S3 (для надёжности)
aws s3 cp $FILE s3://my-backups/
# Удалить локальные бэкапы старше 7 дней
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete
# Отправить алерт
if [ $? -eq 0 ]; then
echo "Backup successful: $FILE" | mail -s "DB Backup OK" admin@example.com
else
echo "Backup FAILED!" | mail -s "DB Backup FAILED" admin@example.com
fi
# Добавить в crontab
# Каждый день в 2 AM
0 2 * * * /home/user/backup.sh
3. Мониторинг и производительность
# app/database/monitoring.py
import psycopg2
from typing import Dict, List
class DatabaseMonitoring:
def __init__(self, connection_string: str):
self.conn_string = connection_string
def get_slow_queries(self, min_duration_ms: int = 1000) -> List[Dict]:
"""Получить медленные запросы"""
conn = psycopg2.connect(self.conn_string)
cur = conn.cursor()
query = """
SELECT
query,
mean_exec_time,
calls
FROM pg_stat_statements
WHERE mean_exec_time > %s
ORDER BY mean_exec_time DESC
LIMIT 10;
"""
cur.execute(query, (min_duration_ms,))
columns = [desc[0] for desc in cur.description]
results = [dict(zip(columns, row)) for row in cur.fetchall()]
cur.close()
conn.close()
return results
def get_table_sizes(self) -> List[Dict]:
"""Размер каждой таблицы"""
conn = psycopg2.connect(self.conn_string)
cur = conn.cursor()
query = """
SELECT
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
"""
cur.execute(query)
columns = [desc[0] for desc in cur.description]
results = [dict(zip(columns, row)) for row in cur.fetchall()]
cur.close()
conn.close()
return results
def get_missing_indexes(self) -> List[Dict]:
"""Неиспользуемые индексы"""
conn = psycopg2.connect(self.conn_string)
cur = conn.cursor()
query = """
SELECT
schemaname,
tablename,
indexname,
idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;
"""
cur.execute(query)
columns = [desc[0] for desc in cur.description]
results = [dict(zip(columns, row)) for row in cur.fetchall()]
cur.close()
conn.close()
return results
# Использование
if __name__ == '__main__':
monitor = DatabaseMonitoring('postgresql://user:pass@localhost/db')
print('=== SLOW QUERIES ===')
for query in monitor.get_slow_queries():
print(f"{query['query'][:50]}... - {query['mean_exec_time']:.2f}ms")
print('\n=== TABLE SIZES ===')
for table in monitor.get_table_sizes():
print(f"{table['tablename']}: {table['size']}")
print('\n=== UNUSED INDEXES ===')
for idx in monitor.get_missing_indexes():
print(f"{idx['indexname']} on {idx['tablename']} (0 scans)")
4. Оптимизация индексов
-- migrations/00010_optimize_indexes.sql
-- +goose Up
-- Создать составной индекс для часто используемого фильтра
CREATE INDEX idx_orders_user_status ON orders(user_id, status, created_at DESC)
WHERE status != 'deleted';
-- Partial индекс для активных пользователей
CREATE INDEX idx_active_users ON users(created_at DESC)
WHERE is_active = true;
-- BRIN индекс для больших таблиц с сортировкой по времени
CREATE INDEX idx_events_created_at ON events USING BRIN (created_at);
-- +goose Down
DROP INDEX idx_events_created_at;
DROP INDEX idx_active_users;
DROP INDEX idx_orders_user_status;
5. Масштабирование
Connection Pooling
# app/database/pool.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Бэйс коннекции
max_overflow=10, # Дополнительные при пиках
pool_recycle=3600, # Переиспользовать каждый час
pool_pre_ping=True # Проверить перед использованием
)
Read Replicas
# Если используешь plusieurs БД
read_engine = create_engine('postgresql://user:pass@replica.db/db')
write_engine = create_engine('postgresql://user:pass@master.db/db')
def get_session(read_only=False):
engine = read_engine if read_only else write_engine
return sessionmaker(bind=engine)()
# В коде
session = get_session(read_only=True) # SELECT
write_session = get_session(read_only=False) # INSERT/UPDATE
6. Стратегия обслуживания
Ежедневно:
✅ Создание бэкапов
✅ Мониторинг медленных запросов
✅ Проверка дискового пространства
Еженедельно:
✅ Анализ индексов (ANALYZE)
✅ Дефрагментация (VACUUM)
✅ Review unused indexes
Ежемесячно:
✅ Полный анализ производительности
✅ Архивирование старых данных
✅ Обновление статистики (ANALYZE VERBOSE)
По необходимости:
✅ Добавление новых индексов
✅ Рефакторинг запросов
✅ Шардирование (если > 100GB)
7. Лучшие практики
- Версионируй миграции — всегда в git
- Тестируй откаты — убедись, что up/down работают
- Автоматические бэкапы — каждый день
- Мониторинг — следи за производительностью
- Индексирование — только что нужно
- Connection pooling — не создавай новые коннекции
- Статистика — регулярно ANALYZE
- Архивирование — удаляй старые данные
Хорошее обслуживание БД — основа надёжной системы!