← Назад к вопросам

Как реализуешь Backend в проекте для получения отчетов?

2.0 Middle🔥 151 комментариев
#Soft Skills и рабочие процессы

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI3 апр. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Backend для получения отчетов: архитектура и реализация

Отчеты - это отдельный слой, требующий специфической архитектуры. Расскажу как правильно организовать backend для их работы.

Архитектура системы отчетов

// Слои системы отчетов
const reportArchitecture = {
  // Слой 1: Presentation Layer (API endpoints)
  api: {
    endpoints: [
      'GET /api/v1/reports/{id}',
      'GET /api/v1/reports?type=sales&from=2024-01-01&to=2024-12-31',
      'POST /api/v1/reports/generate',
      'DELETE /api/v1/reports/{id}'
    ]
  },
  
  // Слой 2: Application Layer (Use Cases)
  useCases: [
    'GenerateReportUseCase',
    'GetReportUseCase',
    'ListReportsUseCase',
    'ExportReportUseCase'
  ],
  
  // Слой 3: Domain Layer (Business Logic)
  domain: {
    entities: ['Report', 'ReportFilter', 'ReportFormat'],
    valueObjects: ['ReportId', 'DateRange', 'ReportStatus']
  },
  
  // Слой 4: Infrastructure Layer (Database, External Services)
  infrastructure: {
    database: 'PostgreSQL, Redis (for caching)',
    fileStorage: 'S3 or local storage',
    externalServices: 'Payment API, Analytics API'
  }
};

Domain Layer: Entity Report

# Python пример с SQLAlchemy
from datetime import datetime
from uuid import uuid4
from typing import Optional
from enum import Enum

class ReportStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    EXPIRED = "expired"

class ReportFormat(str, Enum):
    PDF = "pdf"
    XLSX = "xlsx"
    CSV = "csv"
    JSON = "json"

class Report:
    """Доменная сущность отчета"""
    
    def __init__(
        self,
        id: str,
        user_id: str,
        report_type: str,
        status: ReportStatus,
        format: ReportFormat,
        filters: dict,
        created_at: datetime,
        expires_at: Optional[datetime] = None
    ):
        self.id = id
        self.user_id = user_id
        self.report_type = report_type
        self.status = status
        self.format = format
        self.filters = filters
        self.created_at = created_at
        self.expires_at = expires_at
        self.file_path: Optional[str] = None
        self.file_size: int = 0
    
    def is_expired(self) -> bool:
        """Проверить, истёк ли срок отчета"""
        if not self.expires_at:
            return False
        return datetime.now() > self.expires_at
    
    def mark_processing(self) -> None:
        """Отметить что отчет в обработке"""
        if self.status != ReportStatus.PENDING:
            raise ValueError(f"Cannot process report in {self.status} status")
        self.status = ReportStatus.PROCESSING
    
    def mark_completed(self, file_path: str, file_size: int) -> None:
        """Отметить что отчет готов"""
        if self.status != ReportStatus.PROCESSING:
            raise ValueError(f"Cannot complete report in {self.status} status")
        self.status = ReportStatus.COMPLETED
        self.file_path = file_path
        self.file_size = file_size
    
    def mark_failed(self, error: str) -> None:
        """Отметить ошибку обработки"""
        self.status = ReportStatus.FAILED
        self.error_message = error

Database Layer: ORM Model

from sqlalchemy import Column, String, Enum, DateTime, Integer, JSON, Text, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone

class ReportORM:
    """ORM модель для хранения в БД"""
    __tablename__ = "reports"
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid4()))
    user_id = Column(String(36), ForeignKey("users.id"), nullable=False, index=True)
    report_type = Column(String(50), nullable=False, index=True) # 'sales', 'revenue', 'users'
    status = Column(Enum(ReportStatus), nullable=False, default=ReportStatus.PENDING, index=True)
    format = Column(Enum(ReportFormat), nullable=False, default=ReportFormat.PDF)
    filters = Column(JSON, nullable=False) # {"from": "2024-01-01", "to": "2024-12-31"}
    file_path = Column(String(500), nullable=True) # Путь к файлу
    file_size = Column(Integer, nullable=True) # Размер в байтах
    error_message = Column(Text, nullable=True) # Сообщение об ошибке
    created_at = Column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc))
    completed_at = Column(DateTime(timezone=True), nullable=True)
    expires_at = Column(DateTime(timezone=True), nullable=True) # Когда удалить файл
    
    # Индекс для быстрого поиска
    __table_args__ = (
        Index('ix_reports_user_created', 'user_id', 'created_at'),
    )

Repository Pattern

from abc import ABC, abstractmethod
from typing import List, Optional
from sqlalchemy.orm import Session

class ReportRepository(ABC):
    """Interface для работы с отчетами"""
    
    @abstractmethod
    async def create(self, report: Report) -> Report:
        """Создать новый отчет"""
        pass
    
    @abstractmethod
    async def get_by_id(self, report_id: str, user_id: str) -> Optional[Report]:
        """Получить отчет по ID (с проверкой прав)"""
        pass
    
    @abstractmethod
    async def list_by_user(
        self, 
        user_id: str, 
        skip: int = 0, 
        limit: int = 10,
        report_type: Optional[str] = None,
        status: Optional[ReportStatus] = None
    ) -> tuple[List[Report], int]:
        """Получить список отчетов пользователя (с пагинацией)"""
        pass
    
    @abstractmethod
    async def update_status(self, report_id: str, status: ReportStatus) -> Report:
        """Обновить статус отчета"""
        pass
    
    @abstractmethod
    async def delete(self, report_id: str, user_id: str) -> None:
        """Удалить отчет (проверить права)"""
        pass

class ReportRepositorySQL(ReportRepository):
    """Реализация с использованием SQL"""
    
    def __init__(self, session: Session):
        self.session = session
    
    async def create(self, report: Report) -> Report:
        orm_obj = ReportORM(
            id=report.id,
            user_id=report.user_id,
            report_type=report.report_type,
            status=report.status,
            format=report.format,
            filters=report.filters
        )
        self.session.add(orm_obj)
        await self.session.commit()
        return report
    
    async def get_by_id(self, report_id: str, user_id: str) -> Optional[Report]:
        orm_obj = self.session.query(ReportORM).filter(
            ReportORM.id == report_id,
            ReportORM.user_id == user_id
        ).first()
        
        if not orm_obj:
            return None
        
        return self._to_domain(orm_obj)
    
    async def list_by_user(
        self,
        user_id: str,
        skip: int = 0,
        limit: int = 10,
        report_type: Optional[str] = None,
        status: Optional[ReportStatus] = None
    ) -> tuple[List[Report], int]:
        query = self.session.query(ReportORM).filter(
            ReportORM.user_id == user_id
        )
        
        if report_type:
            query = query.filter(ReportORM.report_type == report_type)
        if status:
            query = query.filter(ReportORM.status == status)
        
        # Сначала посчитать всего
        total = query.count()
        
        # Потом применить пагинацию
        reports = query.order_by(
            ReportORM.created_at.desc()
        ).offset(skip).limit(limit).all()
        
        return (
            [self._to_domain(orm) for orm in reports],
            total
        )
    
    def _to_domain(self, orm: ReportORM) -> Report:
        return Report(
            id=orm.id,
            user_id=orm.user_id,
            report_type=orm.report_type,
            status=orm.status,
            format=orm.format,
            filters=orm.filters,
            created_at=orm.created_at,
            expires_at=orm.expires_at
        )

Use Cases (Бизнес-логика)

from typing import Optional
import asyncio
from uuid import uuid4

class GenerateReportUseCase:
    """Use case для генерации отчета"""
    
    def __init__(
        self,
        report_repo: ReportRepository,
        report_generator: 'ReportGenerator', # Service для создания файла
        file_storage: 'FileStorage' # Service для сохранения файла
    ):
        self.report_repo = report_repo
        self.report_generator = report_generator
        self.file_storage = file_storage
    
    async def execute(
        self,
        user_id: str,
        report_type: str,
        filters: dict,
        format: ReportFormat = ReportFormat.PDF
    ) -> Report:
        """Создать новый отчет и начать его обработку"""
        
        # Валидация входных данных
        if not self._is_valid_report_type(report_type):
            raise ValueError(f"Invalid report type: {report_type}")
        
        # Создать сущность отчета
        report = Report(
            id=str(uuid4()),
            user_id=user_id,
            report_type=report_type,
            status=ReportStatus.PENDING,
            format=format,
            filters=filters,
            created_at=datetime.now(timezone.utc),
            expires_at=datetime.now(timezone.utc) + timedelta(days=7)
        )
        
        # Сохранить в БД
        await self.report_repo.create(report)
        
        # Запустить асинхронную обработку (не блокировать)
        asyncio.create_task(self._process_report(report))
        
        return report
    
    async def _process_report(self, report: Report) -> None:
        """Обработать отчет в фоне"""
        try:
            # Отметить что начинается обработка
            report.mark_processing()
            await self.report_repo.update_status(report.id, ReportStatus.PROCESSING)
            
            # Получить данные и сгенерировать отчет
            data = await self._fetch_data_for_report(report)
            file_content = await self.report_generator.generate(
                report_type=report.report_type,
                format=report.format,
                data=data
            )
            
            # Сохранить файл
            file_path = await self.file_storage.save(
                bucket="reports",
                key=f"{report.user_id}/{report.id}.{report.format.value}",
                content=file_content
            )
            
            # Отметить что отчет готов
            report.mark_completed(
                file_path=file_path,
                file_size=len(file_content)
            )
            await self.report_repo.update_status(report.id, ReportStatus.COMPLETED)
            
        except Exception as error:
            report.mark_failed(str(error))
            await self.report_repo.update_status(report.id, ReportStatus.FAILED)
    
    async def _fetch_data_for_report(self, report: Report) -> dict:
        """Получить данные для отчета из разных источников"""
        if report.report_type == "sales":
            return await self._get_sales_data(report.filters)
        elif report.report_type == "revenue":
            return await self._get_revenue_data(report.filters)
        else:
            raise ValueError(f"Unknown report type: {report.report_type}")
    
    def _is_valid_report_type(self, report_type: str) -> bool:
        return report_type in ["sales", "revenue", "users", "orders"]

API Endpoints (FastAPI)

from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from fastapi.responses import FileResponse
from datetime import datetime, timezone

router = APIRouter(prefix="/api/v1/reports", tags=["reports"])

# Dependency для получения current user
async def get_current_user(token: str) -> str:
    # Проверить токен и вернуть user_id
    pass

@router.get("")
async def list_reports(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    report_type: Optional[str] = None,
    status: Optional[str] = None,
    current_user: str = Depends(get_current_user)
):
    """Получить список отчетов пользователя"""
    try:
        # Получить use case из контейнера зависимостей
        list_use_case = get_list_reports_use_case()
        
        reports, total = await list_use_case.execute(
            user_id=current_user,
            skip=skip,
            limit=limit,
            report_type=report_type,
            status=status
        )
        
        return {
            "data": [serialize_report(r) for r in reports],
            "total": total,
            "skip": skip,
            "limit": limit
        }
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("")
async def generate_report(
    request: GenerateReportRequest,
    background_tasks: BackgroundTasks,
    current_user: str = Depends(get_current_user)
):
    """Создать новый отчет"""
    try:
        generate_use_case = get_generate_report_use_case()
        
        report = await generate_use_case.execute(
            user_id=current_user,
            report_type=request.report_type,
            filters=request.filters.dict(),
            format=ReportFormat(request.format)
        )
        
        return {
            "id": report.id,
            "status": report.status.value,
            "created_at": report.created_at.isoformat(),
            "message": "Report generation started"
        }
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/{report_id}")
async def get_report(
    report_id: str,
    current_user: str = Depends(get_current_user)
):
    """Получить информацию об отчете"""
    get_use_case = get_get_report_use_case()
    
    report = await get_use_case.execute(
        report_id=report_id,
        user_id=current_user
    )
    
    if not report:
        raise HTTPException(status_code=404, detail="Report not found")
    
    return serialize_report(report)

@router.get("/{report_id}/download")
async def download_report(
    report_id: str,
    current_user: str = Depends(get_current_user)
):
    """Скачать файл отчета"""
    get_use_case = get_get_report_use_case()
    
    report = await get_use_case.execute(
        report_id=report_id,
        user_id=current_user
    )
    
    if not report:
        raise HTTPException(status_code=404, detail="Report not found")
    
    if report.status != ReportStatus.COMPLETED:
        raise HTTPException(
            status_code=400,
            detail=f"Report is {report.status.value}, not ready for download"
        )
    
    if not report.file_path:
        raise HTTPException(status_code=500, detail="File not found")
    
    return FileResponse(
        path=report.file_path,
        filename=f"report_{report.id}.{report.format.value}"
    )

@router.delete("/{report_id}")
async def delete_report(
    report_id: str,
    current_user: str = Depends(get_current_user)
):
    """Удалить отчет"""
    delete_use_case = get_delete_report_use_case()
    
    await delete_use_case.execute(
        report_id=report_id,
        user_id=current_user
    )
    
    return {"message": "Report deleted"}

Кэширование и оптимизация

import redis
from functools import wraps

class ReportCache:
    """Кэширование результатов отчетов"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def get_cached_report(self, cache_key: str) -> Optional[dict]:
        data = self.redis.get(cache_key)
        if not data:
            return None
        return json.loads(data)
    
    async def cache_report(self, cache_key: str, data: dict, ttl: int = 3600):
        """Кэшировать результаты на 1 час"""
        self.redis.setex(
            cache_key,
            ttl,
            json.dumps(data)
        )
    
    def get_cache_key(self, user_id: str, report_type: str, filters: dict) -> str:
        # Создать стабильный ключ
        filters_str = json.dumps(filters, sort_keys=True)
        return f"report:{user_id}:{report_type}:{hash(filters_str)}"

Выводы

  1. DDD подход: отделите domain от infrastructure
  2. Repository pattern: абстрагируйте БД
  3. Use cases: вся бизнес-логика в отдельных классах
  4. Асинхронность: используйте background tasks для генерации
  5. Кэширование: кэшируйте результаты отчетов
  6. Безопасность: проверяйте права доступа
  7. Файлы: сохраняйте в S3 или на диск, не в БД
  8. Статусы: отслеживайте статус обработки
  9. Экспирация: удаляйте старые файлы
  10. Пагинация: поддерживайте limit/offset для списков
Как реализуешь Backend в проекте для получения отчетов? | PrepBro