← Назад к вопросам
Как ускорить генерацию отчета на сайте?
2.0 Middle🔥 121 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как ускорить генерацию отчета на сайте?
Оптимизация генерации отчетов — одна из ключевых задач, которая напрямую влияет на пользовательский опыт. Рассмотрим комплексный подход, который включает несколько слоев оптимизации.
1. Асинхронная обработка (Async + Celery)
Первое и самое важное — вынести тяжелые операции за границы HTTP-запроса. Используй Celery с Redis как broker:
from celery import shared_task
import time
@shared_task(bind=True, max_retries=3)
def generate_report_task(self, report_id, filters):
try:
# Долгая операция (10+ секунд)
report_data = fetch_data_from_db(filters)
processed_data = process_and_aggregate(report_data)
save_report(report_id, processed_data)
except Exception as exc:
# Retry с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Endpoint вернет результат сразу:
@app.post("/api/v1/reports")
async def create_report(filters: ReportFilters):
report_id = generate_uuid()
# Запускаем задачу в фоне
generate_report_task.delay(report_id, filters.dict())
# Клиент получает ID и может опросить статус
return {"id": report_id, "status": "processing"}
Клиент периодически опрашивает статус:
@app.get("/api/v1/reports/{report_id}")
async def get_report(report_id: str):
# Клиент получает результат, когда готов
return fetch_report_from_cache(report_id)
2. Кэширование (Redis + умные ключи)
Не генерируй отчет дважды для одинаковых параметров:
import hashlib
from redis import Redis
import json
redis_client = Redis(host="localhost", port=6379, decode_responses=True)
def get_cached_report(filters: dict, cache_ttl=3600):
# Создаем детерминированный ключ из параметров
cache_key = f"report:{hashlib.md5(json.dumps(filters, sort_keys=True).encode()).hexdigest()}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Генерируем новый отчет
report_data = generate_report(filters)
redis_client.setex(cache_key, cache_ttl, json.dumps(report_data))
return report_data
3. Оптимизация базы данных
- Индексы: убедись, что все поля в WHERE/JOIN индексированы
- Batch запросы: вместо 1000 single-row запросов сделай 1 bulk-query
# ❌ Медленно: N+1 problem
for order_id in order_ids:
order = db.query(Order).filter(Order.id == order_id).first()
# ✅ Быстро: одна выборка
orders = db.query(Order).filter(Order.id.in_(order_ids)).all()
- Пагинация: не загружай всё сразу
- Projection: выбирай только нужные колонки, не SELECT *
# ✅ Выбираем только нужные поля
report_rows = db.query(Order.id, Order.date, Order.amount)
.filter(Order.created_at > start_date)
.all()
4. Генерация отчета на лету (Streaming)
Для больших отчетов не ждоть полной генерации:
from fastapi.responses import StreamingResponse
import csv
import io
@app.get("/api/v1/reports/export")
async def export_report_streaming(filters: ReportFilters):
def generate():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "Name", "Amount"]) # Заголовок
# Берем данные батчами
batch_size = 1000
offset = 0
while True:
rows = fetch_report_rows(filters, offset=offset, limit=batch_size)
if not rows:
break
for row in rows:
writer.writerow([row.id, row.name, row.amount])
output.seek(0)
yield output.read() + "\n"
output.truncate(0)
output.seek(0)
offset += batch_size
return StreamingResponse(generate(), media_type="text/csv")
5. Параллельная обработка (ProcessPool)
Если обработка CPU-bound (не I/O), используй многопроцессность:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def process_chunk(chunk):
# Тяжелая обработка
return expensive_calculation(chunk)
def generate_report_parallel(data):
chunks = split_into_chunks(data, num_chunks=multiprocessing.cpu_count())
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
results = list(executor.map(process_chunk, chunks))
return merge_results(results)
6. Компрессия и минификация
Для больших файлов используй gzip:
from fastapi.middleware.gzip import GZIPMiddleware
app.add_middleware(GZIPMiddleware, minimum_size=1000)
7. Кэширование уровня приложения
Используй в памяти для часто запрашиваемых данных:
from functools import lru_cache
@lru_cache(maxsize=128)
def get_category_mapping():
# Кэшируется в памяти процесса
return fetch_from_db_once()
Чеклист оптимизации
- ✅ Профилируй запросы (pip install django-silk или django-debug-toolbar)
- ✅ Выноси долгие операции в async tasks
- ✅ Кэшируй часто запрашиваемые данные
- ✅ Оптимизируй SQL (индексы, batch, projection)
- ✅ Используй streaming для больших файлов
- ✅ Настрой database connection pooling (PgBouncer)
- ✅ Мониторь метрики (New Relic, DataDog)