← Назад к вопросам
Как реализуешь Backend в проекте для получения отчетов?
2.0 Middle🔥 151 комментариев
#Soft Skills и рабочие процессы
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI3 апр. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Backend для получения отчетов: архитектура и реализация
Отчеты - это отдельный слой, требующий специфической архитектуры. Расскажу как правильно организовать backend для их работы.
Архитектура системы отчетов
// Слои системы отчетов
const reportArchitecture = {
// Слой 1: Presentation Layer (API endpoints)
api: {
endpoints: [
'GET /api/v1/reports/{id}',
'GET /api/v1/reports?type=sales&from=2024-01-01&to=2024-12-31',
'POST /api/v1/reports/generate',
'DELETE /api/v1/reports/{id}'
]
},
// Слой 2: Application Layer (Use Cases)
useCases: [
'GenerateReportUseCase',
'GetReportUseCase',
'ListReportsUseCase',
'ExportReportUseCase'
],
// Слой 3: Domain Layer (Business Logic)
domain: {
entities: ['Report', 'ReportFilter', 'ReportFormat'],
valueObjects: ['ReportId', 'DateRange', 'ReportStatus']
},
// Слой 4: Infrastructure Layer (Database, External Services)
infrastructure: {
database: 'PostgreSQL, Redis (for caching)',
fileStorage: 'S3 or local storage',
externalServices: 'Payment API, Analytics API'
}
};
Domain Layer: Entity Report
# Python пример с SQLAlchemy
from datetime import datetime
from uuid import uuid4
from typing import Optional
from enum import Enum
class ReportStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
EXPIRED = "expired"
class ReportFormat(str, Enum):
PDF = "pdf"
XLSX = "xlsx"
CSV = "csv"
JSON = "json"
class Report:
"""Доменная сущность отчета"""
def __init__(
self,
id: str,
user_id: str,
report_type: str,
status: ReportStatus,
format: ReportFormat,
filters: dict,
created_at: datetime,
expires_at: Optional[datetime] = None
):
self.id = id
self.user_id = user_id
self.report_type = report_type
self.status = status
self.format = format
self.filters = filters
self.created_at = created_at
self.expires_at = expires_at
self.file_path: Optional[str] = None
self.file_size: int = 0
def is_expired(self) -> bool:
"""Проверить, истёк ли срок отчета"""
if not self.expires_at:
return False
return datetime.now() > self.expires_at
def mark_processing(self) -> None:
"""Отметить что отчет в обработке"""
if self.status != ReportStatus.PENDING:
raise ValueError(f"Cannot process report in {self.status} status")
self.status = ReportStatus.PROCESSING
def mark_completed(self, file_path: str, file_size: int) -> None:
"""Отметить что отчет готов"""
if self.status != ReportStatus.PROCESSING:
raise ValueError(f"Cannot complete report in {self.status} status")
self.status = ReportStatus.COMPLETED
self.file_path = file_path
self.file_size = file_size
def mark_failed(self, error: str) -> None:
"""Отметить ошибку обработки"""
self.status = ReportStatus.FAILED
self.error_message = error
Database Layer: ORM Model
from sqlalchemy import Column, String, Enum, DateTime, Integer, JSON, Text, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
class ReportORM:
"""ORM модель для хранения в БД"""
__tablename__ = "reports"
id = Column(String(36), primary_key=True, default=lambda: str(uuid4()))
user_id = Column(String(36), ForeignKey("users.id"), nullable=False, index=True)
report_type = Column(String(50), nullable=False, index=True) # 'sales', 'revenue', 'users'
status = Column(Enum(ReportStatus), nullable=False, default=ReportStatus.PENDING, index=True)
format = Column(Enum(ReportFormat), nullable=False, default=ReportFormat.PDF)
filters = Column(JSON, nullable=False) # {"from": "2024-01-01", "to": "2024-12-31"}
file_path = Column(String(500), nullable=True) # Путь к файлу
file_size = Column(Integer, nullable=True) # Размер в байтах
error_message = Column(Text, nullable=True) # Сообщение об ошибке
created_at = Column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc))
completed_at = Column(DateTime(timezone=True), nullable=True)
expires_at = Column(DateTime(timezone=True), nullable=True) # Когда удалить файл
# Индекс для быстрого поиска
__table_args__ = (
Index('ix_reports_user_created', 'user_id', 'created_at'),
)
Repository Pattern
from abc import ABC, abstractmethod
from typing import List, Optional
from sqlalchemy.orm import Session
class ReportRepository(ABC):
"""Interface для работы с отчетами"""
@abstractmethod
async def create(self, report: Report) -> Report:
"""Создать новый отчет"""
pass
@abstractmethod
async def get_by_id(self, report_id: str, user_id: str) -> Optional[Report]:
"""Получить отчет по ID (с проверкой прав)"""
pass
@abstractmethod
async def list_by_user(
self,
user_id: str,
skip: int = 0,
limit: int = 10,
report_type: Optional[str] = None,
status: Optional[ReportStatus] = None
) -> tuple[List[Report], int]:
"""Получить список отчетов пользователя (с пагинацией)"""
pass
@abstractmethod
async def update_status(self, report_id: str, status: ReportStatus) -> Report:
"""Обновить статус отчета"""
pass
@abstractmethod
async def delete(self, report_id: str, user_id: str) -> None:
"""Удалить отчет (проверить права)"""
pass
class ReportRepositorySQL(ReportRepository):
"""Реализация с использованием SQL"""
def __init__(self, session: Session):
self.session = session
async def create(self, report: Report) -> Report:
orm_obj = ReportORM(
id=report.id,
user_id=report.user_id,
report_type=report.report_type,
status=report.status,
format=report.format,
filters=report.filters
)
self.session.add(orm_obj)
await self.session.commit()
return report
async def get_by_id(self, report_id: str, user_id: str) -> Optional[Report]:
orm_obj = self.session.query(ReportORM).filter(
ReportORM.id == report_id,
ReportORM.user_id == user_id
).first()
if not orm_obj:
return None
return self._to_domain(orm_obj)
async def list_by_user(
self,
user_id: str,
skip: int = 0,
limit: int = 10,
report_type: Optional[str] = None,
status: Optional[ReportStatus] = None
) -> tuple[List[Report], int]:
query = self.session.query(ReportORM).filter(
ReportORM.user_id == user_id
)
if report_type:
query = query.filter(ReportORM.report_type == report_type)
if status:
query = query.filter(ReportORM.status == status)
# Сначала посчитать всего
total = query.count()
# Потом применить пагинацию
reports = query.order_by(
ReportORM.created_at.desc()
).offset(skip).limit(limit).all()
return (
[self._to_domain(orm) for orm in reports],
total
)
def _to_domain(self, orm: ReportORM) -> Report:
return Report(
id=orm.id,
user_id=orm.user_id,
report_type=orm.report_type,
status=orm.status,
format=orm.format,
filters=orm.filters,
created_at=orm.created_at,
expires_at=orm.expires_at
)
Use Cases (Бизнес-логика)
from typing import Optional
import asyncio
from uuid import uuid4
class GenerateReportUseCase:
"""Use case для генерации отчета"""
def __init__(
self,
report_repo: ReportRepository,
report_generator: 'ReportGenerator', # Service для создания файла
file_storage: 'FileStorage' # Service для сохранения файла
):
self.report_repo = report_repo
self.report_generator = report_generator
self.file_storage = file_storage
async def execute(
self,
user_id: str,
report_type: str,
filters: dict,
format: ReportFormat = ReportFormat.PDF
) -> Report:
"""Создать новый отчет и начать его обработку"""
# Валидация входных данных
if not self._is_valid_report_type(report_type):
raise ValueError(f"Invalid report type: {report_type}")
# Создать сущность отчета
report = Report(
id=str(uuid4()),
user_id=user_id,
report_type=report_type,
status=ReportStatus.PENDING,
format=format,
filters=filters,
created_at=datetime.now(timezone.utc),
expires_at=datetime.now(timezone.utc) + timedelta(days=7)
)
# Сохранить в БД
await self.report_repo.create(report)
# Запустить асинхронную обработку (не блокировать)
asyncio.create_task(self._process_report(report))
return report
async def _process_report(self, report: Report) -> None:
"""Обработать отчет в фоне"""
try:
# Отметить что начинается обработка
report.mark_processing()
await self.report_repo.update_status(report.id, ReportStatus.PROCESSING)
# Получить данные и сгенерировать отчет
data = await self._fetch_data_for_report(report)
file_content = await self.report_generator.generate(
report_type=report.report_type,
format=report.format,
data=data
)
# Сохранить файл
file_path = await self.file_storage.save(
bucket="reports",
key=f"{report.user_id}/{report.id}.{report.format.value}",
content=file_content
)
# Отметить что отчет готов
report.mark_completed(
file_path=file_path,
file_size=len(file_content)
)
await self.report_repo.update_status(report.id, ReportStatus.COMPLETED)
except Exception as error:
report.mark_failed(str(error))
await self.report_repo.update_status(report.id, ReportStatus.FAILED)
async def _fetch_data_for_report(self, report: Report) -> dict:
"""Получить данные для отчета из разных источников"""
if report.report_type == "sales":
return await self._get_sales_data(report.filters)
elif report.report_type == "revenue":
return await self._get_revenue_data(report.filters)
else:
raise ValueError(f"Unknown report type: {report.report_type}")
def _is_valid_report_type(self, report_type: str) -> bool:
return report_type in ["sales", "revenue", "users", "orders"]
API Endpoints (FastAPI)
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from fastapi.responses import FileResponse
from datetime import datetime, timezone
router = APIRouter(prefix="/api/v1/reports", tags=["reports"])
# Dependency для получения current user
async def get_current_user(token: str) -> str:
# Проверить токен и вернуть user_id
pass
@router.get("")
async def list_reports(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
report_type: Optional[str] = None,
status: Optional[str] = None,
current_user: str = Depends(get_current_user)
):
"""Получить список отчетов пользователя"""
try:
# Получить use case из контейнера зависимостей
list_use_case = get_list_reports_use_case()
reports, total = await list_use_case.execute(
user_id=current_user,
skip=skip,
limit=limit,
report_type=report_type,
status=status
)
return {
"data": [serialize_report(r) for r in reports],
"total": total,
"skip": skip,
"limit": limit
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("")
async def generate_report(
request: GenerateReportRequest,
background_tasks: BackgroundTasks,
current_user: str = Depends(get_current_user)
):
"""Создать новый отчет"""
try:
generate_use_case = get_generate_report_use_case()
report = await generate_use_case.execute(
user_id=current_user,
report_type=request.report_type,
filters=request.filters.dict(),
format=ReportFormat(request.format)
)
return {
"id": report.id,
"status": report.status.value,
"created_at": report.created_at.isoformat(),
"message": "Report generation started"
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{report_id}")
async def get_report(
report_id: str,
current_user: str = Depends(get_current_user)
):
"""Получить информацию об отчете"""
get_use_case = get_get_report_use_case()
report = await get_use_case.execute(
report_id=report_id,
user_id=current_user
)
if not report:
raise HTTPException(status_code=404, detail="Report not found")
return serialize_report(report)
@router.get("/{report_id}/download")
async def download_report(
report_id: str,
current_user: str = Depends(get_current_user)
):
"""Скачать файл отчета"""
get_use_case = get_get_report_use_case()
report = await get_use_case.execute(
report_id=report_id,
user_id=current_user
)
if not report:
raise HTTPException(status_code=404, detail="Report not found")
if report.status != ReportStatus.COMPLETED:
raise HTTPException(
status_code=400,
detail=f"Report is {report.status.value}, not ready for download"
)
if not report.file_path:
raise HTTPException(status_code=500, detail="File not found")
return FileResponse(
path=report.file_path,
filename=f"report_{report.id}.{report.format.value}"
)
@router.delete("/{report_id}")
async def delete_report(
report_id: str,
current_user: str = Depends(get_current_user)
):
"""Удалить отчет"""
delete_use_case = get_delete_report_use_case()
await delete_use_case.execute(
report_id=report_id,
user_id=current_user
)
return {"message": "Report deleted"}
Кэширование и оптимизация
import redis
from functools import wraps
class ReportCache:
"""Кэширование результатов отчетов"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def get_cached_report(self, cache_key: str) -> Optional[dict]:
data = self.redis.get(cache_key)
if not data:
return None
return json.loads(data)
async def cache_report(self, cache_key: str, data: dict, ttl: int = 3600):
"""Кэшировать результаты на 1 час"""
self.redis.setex(
cache_key,
ttl,
json.dumps(data)
)
def get_cache_key(self, user_id: str, report_type: str, filters: dict) -> str:
# Создать стабильный ключ
filters_str = json.dumps(filters, sort_keys=True)
return f"report:{user_id}:{report_type}:{hash(filters_str)}"
Выводы
- DDD подход: отделите domain от infrastructure
- Repository pattern: абстрагируйте БД
- Use cases: вся бизнес-логика в отдельных классах
- Асинхронность: используйте background tasks для генерации
- Кэширование: кэшируйте результаты отчетов
- Безопасность: проверяйте права доступа
- Файлы: сохраняйте в S3 или на диск, не в БД
- Статусы: отслеживайте статус обработки
- Экспирация: удаляйте старые файлы
- Пагинация: поддерживайте limit/offset для списков