← Назад к вопросам

Что можно конфигурировать и управлять (manage) при работе с хранилищем и доступом к файлам?

2.3 Middle🔥 111 комментариев
#Python Core

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Конфигурация и управление хранилищем файлов

Работа с файлами и объектным хранилищем требует правильной конфигурации доступа, прав и расположения данных.

Типы хранилищ

Хранилище файлов:
├─ Local Storage (локальный диск)
├─ S3-совместимые (AWS S3, MinIO, DigitalOcean Spaces)
├─ Cloud Storage (Google Cloud Storage, Azure Blob)
├─ Distributed (HDFS, Ceph)
└─ CDN (CloudFlare, Akamai)

1. Local Storage конфигурация

from pathlib import Path
import os

class LocalStorageConfig:
    # Базовые пути
    BASE_UPLOAD_DIR = Path('/var/uploads')
    TEMP_DIR = Path('/tmp/uploads')
    
    # Ограничения
    MAX_FILE_SIZE = 100 * 1024 * 1024  # 100 MB
    ALLOWED_EXTENSIONS = {'pdf', 'doc', 'docx', 'txt', 'jpg', 'png'}
    
    # Безопасность
    ENABLE_VIRUS_SCAN = True
    DISABLE_EXECUTE_PERMISSION = True
    
    # Организация
    ORGANIZE_BY_DATE = True  # /uploads/2024/03/23/
    ORGANIZE_BY_USER = True  # /uploads/user_123/
    USE_UNIQUE_NAMES = True  # uuid + ext
    
    @staticmethod
    def get_upload_path(user_id: int, date: datetime) -> Path:
        """Генерируем безопасный путь для загрузки"""
        # /uploads/2024/03/23/user_123/
        path = (
            LocalStorageConfig.BASE_UPLOAD_DIR /
            str(date.year) /
            str(date.month).zfill(2) /
            str(date.day).zfill(2) /
            f"user_{user_id}"
        )
        path.mkdir(parents=True, exist_ok=True)
        return path

# Использование
from datetime import datetime

upload_path = LocalStorageConfig.get_upload_path(
    user_id=123,
    date=datetime.now()
)
print(upload_path)  # /var/uploads/2024/03/23/user_123/

2. S3 конфигурация (AWS / MinIO)

import boto3
from botocore.config import Config

class S3Config:
    # Базовые параметры
    AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
    AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
    AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
    AWS_S3_BUCKET = os.getenv('AWS_S3_BUCKET')
    
    # Опции производительности
    MAX_POOL_CONNECTIONS = 10
    CONNECT_TIMEOUT = 30
    READ_TIMEOUT = 60
    
    # Multipart upload
    MULTIPART_THRESHOLD = 25 * 1024 * 1024  # 25 MB
    MULTIPART_CHUNKSIZE = 25 * 1024 * 1024
    
    # Ретраи
    MAX_ATTEMPTS = 3
    RETRY_MODE = 'adaptive'
    
    @staticmethod
    def get_s3_client():
        config = Config(
            max_pool_connections=S3Config.MAX_POOL_CONNECTIONS,
            connect_timeout=S3Config.CONNECT_TIMEOUT,
            read_timeout=S3Config.READ_TIMEOUT,
            retries={'max_attempts': S3Config.MAX_ATTEMPTS, 'mode': S3Config.RETRY_MODE}
        )
        
        return boto3.client(
            's3',
            aws_access_key_id=S3Config.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=S3Config.AWS_SECRET_ACCESS_KEY,
            region_name=S3Config.AWS_REGION,
            config=config
        )

# Использование
class S3Storage:
    def __init__(self):
        self.s3 = S3Config.get_s3_client()
        self.bucket = S3Config.AWS_S3_BUCKET
    
    def upload_file(
        self,
        file_path: str,
        s3_key: str,
        metadata: dict = None
    ) -> str:
        """Загрузить файл в S3"""
        extra_args = {}
        
        if metadata:
            extra_args['Metadata'] = metadata
        
        # Определяем тип контента
        from mimetypes import guess_type
        content_type, _ = guess_type(file_path)
        if content_type:
            extra_args['ContentType'] = content_type
        
        # Загружаем
        self.s3.upload_file(
            file_path,
            self.bucket,
            s3_key,
            ExtraArgs=extra_args
        )
        
        return f"s3://{self.bucket}/{s3_key}"
    
    def generate_presigned_url(
        self,
        s3_key: str,
        expiration: int = 3600
    ) -> str:
        """Генерируем временный URL для доступа"""
        return self.s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': self.bucket, 'Key': s3_key},
            ExpiresIn=expiration
        )
    
    def set_object_acl(
        self,
        s3_key: str,
        acl: str = 'private'  # или 'public-read'
    ):
        """Управляем правами доступа"""
        self.s3.put_object_acl(
            Bucket=self.bucket,
            Key=s3_key,
            ACL=acl
        )
    
    def set_lifecycle_policy(self):
        """Настраиваем автоматическое удаление старых файлов"""
        lifecycle_config = {
            'Rules': [
                {
                    'ID': 'Delete old temp files',
                    'Status': 'Enabled',
                    'Prefix': 'temp/',
                    'Expiration': {'Days': 7}
                },
                {
                    'ID': 'Archive old logs',
                    'Status': 'Enabled',
                    'Prefix': 'logs/',
                    'Transitions': [
                        {
                            'Days': 30,
                            'StorageClass': 'GLACIER'
                        }
                    ]
                }
            ]
        }
        self.s3.put_bucket_lifecycle_configuration(
            Bucket=self.bucket,
            LifecycleConfiguration=lifecycle_config
        )

3. Права доступа и Permission Management

from enum import Enum
from typing import List

class FilePermission(Enum):
    READ = 'r'
    WRITE = 'w'
    DELETE = 'd'
    SHARE = 's'

class FileACL:
    """Access Control List для файлов"""
    
    def __init__(self, file_id: str):
        self.file_id = file_id
        # В БД: file_id -> permissions -> user_id
        self.permissions: dict[str, List[FilePermission]] = {}
    
    def grant_permission(
        self,
        user_id: str,
        permission: FilePermission
    ):
        """Даем пользователю право"""
        if user_id not in self.permissions:
            self.permissions[user_id] = []
        
        if permission not in self.permissions[user_id]:
            self.permissions[user_id].append(permission)
            self.save_to_db()
    
    def revoke_permission(
        self,
        user_id: str,
        permission: FilePermission
    ):
        """Отзываем право у пользователя"""
        if user_id in self.permissions:
            self.permissions[user_id].remove(permission)
            if not self.permissions[user_id]:
                del self.permissions[user_id]
            self.save_to_db()
    
    def check_permission(
        self,
        user_id: str,
        permission: FilePermission
    ) -> bool:
        """Проверяем, есть ли у пользователя право"""
        return (
            user_id in self.permissions and
            permission in self.permissions[user_id]
        )
    
    def make_public(self):
        """Даем публичный доступ"""
        self.grant_permission('public', FilePermission.READ)
    
    def save_to_db(self):
        """Сохраняем в БД"""
        # UPDATE file_acl SET permissions = ... WHERE file_id = ...
        pass

4. Кэширование и CDN интеграция

import hashlib

class FileCache:
    """Управление кэшем и CDN"""
    
    # Кэширование браузера
    CACHE_CONTROL_PUBLIC = "public, max-age=31536000"  # 1 год
    CACHE_CONTROL_PRIVATE = "private, max-age=3600"  # 1 час
    CACHE_CONTROL_NO_CACHE = "no-cache, no-store, must-revalidate"
    
    # CloudFlare кэширование
    CF_CACHE_EVERYTHING = True
    CF_CACHE_TTL = 86400  # 24 часа
    
    @staticmethod
    def set_cache_headers(
        response,
        cache_type: str = 'public',
        max_age: int = 31536000
    ):
        """Устанавливаем заголовки кэша"""
        if cache_type == 'public':
            response.headers['Cache-Control'] = f"public, max-age={max_age}"
        elif cache_type == 'private':
            response.headers['Cache-Control'] = f"private, max-age={max_age}"
        else:
            response.headers['Cache-Control'] = "no-cache, no-store, must-revalidate"
        
        # ETag для проверки изменений
        content_hash = hashlib.md5(response.body).hexdigest()
        response.headers['ETag'] = f'"{content_hash}"'
        
        # CloudFlare правила
        response.headers['CF-Cache-Everything'] = 'true'
        response.headers['Cache-Tag'] = 'public-assets'
    
    @staticmethod
    def invalidate_cache(pattern: str):
        """Инвалидируем кэш (CloudFlare)"""
        import requests
        
        headers = {
            'Authorization': f"Bearer {os.getenv('CLOUDFLARE_API_TOKEN')}",
            'Content-Type': 'application/json',
        }
        
        data = {
            'files': [pattern]  # или использовать cache tags
        }
        
        zone_id = os.getenv('CLOUDFLARE_ZONE_ID')
        requests.post(
            f"https://api.cloudflare.com/client/v4/zones/{zone_id}/purge_cache",
            headers=headers,
            json=data
        )

5. Шифрование и безопасность

from cryptography.fernet import Fernet
import os

class FileEncryption:
    """Управление шифрованием файлов"""
    
    def __init__(self, encryption_key: str = None):
        # Берем из environment для prod
        self.key = encryption_key or os.getenv('FILE_ENCRYPTION_KEY')
        self.cipher = Fernet(self.key)
    
    def encrypt_filename(self, filename: str) -> str:
        """Шифруем имя файла (для конфиденциальности)"""
        encrypted = self.cipher.encrypt(filename.encode())
        return encrypted.decode()
    
    def decrypt_filename(self, encrypted_filename: str) -> str:
        """Расшифровываем имя файла"""
        decrypted = self.cipher.decrypt(encrypted_filename.encode())
        return decrypted.decode()
    
    def get_encryption_headers(self, s3_client):
        """Возвращаем параметры для S3 серверного шифрования"""
        return {
            'ServerSideEncryption': 'aws:kms',
            'SSEKMSKeyId': os.getenv('AWS_KMS_KEY_ID')
        }

6. Управление версиями файлов

class FileVersioning:
    """Управление версиями файлов"""
    
    def enable_versioning(self, s3_client, bucket: str):
        """Включаем версионирование в S3"""
        s3_client.put_bucket_versioning(
            Bucket=bucket,
            VersioningConfiguration={'Status': 'Enabled'}
        )
    
    def list_versions(self, s3_client, bucket: str, key: str):
        """Список всех версий файла"""
        response = s3_client.list_object_versions(
            Bucket=bucket,
            Prefix=key
        )
        return response.get('Versions', [])
    
    def restore_version(
        self,
        s3_client,
        bucket: str,
        key: str,
        version_id: str
    ):
        """Восстанавливаем старую версию файла"""
        obj = s3_client.get_object(
            Bucket=bucket,
            Key=key,
            VersionId=version_id
        )
        # Копируем как текущую версию
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=obj['Body'].read()
        )

7. Мониторинг и логирование доступа

import logging
from datetime import datetime

class FileAccessLog:
    """Логирование доступа к файлам"""
    
    @staticmethod
    def log_access(
        user_id: str,
        file_id: str,
        action: str,
        status: str,
        ip_address: str
    ):
        """Логируем доступ для аудита"""
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'file_id': file_id,
            'action': action,  # read, write, delete
            'status': status,  # success, failed
            'ip_address': ip_address
        }
        
        # Сохраняем в БД или логи
        logger.info(
            f"File access: {action}",
            extra=log_entry
        )

# Middleware для логирования
from fastapi import Request

@app.middleware("http")
async def log_file_access(request: Request, call_next):
    if "/files/" in request.url.path:
        user_id = request.state.user_id
        file_id = request.path_params.get('file_id')
        ip_address = request.client.host
        
        response = await call_next(request)
        
        FileAccessLog.log_access(
            user_id=user_id,
            file_id=file_id,
            action='read' if request.method == 'GET' else 'write',
            status='success' if response.status_code < 400 else 'failed',
            ip_address=ip_address
        )
        
        return response
    
    return await call_next(request)

Конфигурационный файл

# storage_config.yaml
storage:
  type: s3  # local, s3, gcs
  
  s3:
    bucket: my-app-bucket
    region: us-east-1
    prefix: uploads/
    
  paths:
    uploads: /uploads
    temp: /tmp/uploads
    cache: /var/cache
  
  limits:
    max_file_size: 104857600  # 100 MB
    max_total_size: 1099511627776  # 1 TB
  
  permissions:
    default_acl: private
    public_acl: public-read
  
  cache:
    enabled: true
    ttl: 86400
    cdn: cloudflare
  
  security:
    encryption: enabled
    kms_key_id: arn:aws:kms:...
    scan_viruses: true
  
  lifecycle:
    temp_retention_days: 7
    archive_days: 30
    delete_days: 365

Best Practices

  1. Никогда не храни credentials в коде — используй environment переменные
  2. Управляй доступом на уровне ACL — не делай всё public
  3. Используй временные URLs — для временного доступа
  4. Логируй всё доступы — для аудита
  5. Шифруй чувствительные данные — в transit и at rest
  6. Регулярно проверяй права — неспользуемые доступы отзывай
  7. Настрой TTL и versioning — для надежности
  8. Мониторь использование хранилища — избегай неожиданных счетов
Что можно конфигурировать и управлять (manage) при работе с хранилищем и доступом к файлам? | PrepBro