← Назад к вопросам
Как решишь проблему задач, требующих много времени?
1.3 Junior🔥 141 комментариев
#Soft Skills#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Решение задач, требующих много времени
Долгоживущие операции — это обычная проблема в backend разработке. API запрос ждёт 30 секунд? Длительная обработка может заблокировать других пользователей. Я использую несколько проверенных подходов.
1. Асинхронная обработка с Celery
Проблема: Длительная операция блокирует обработку запроса Решение: Запустить в фоне, вернуть клиенту ID задачи
from celery import Celery
from celery.result import AsyncResult
import time
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task(bind=True)
def process_large_file(self, file_path: str):
"""Долгая обработка файла"""
try:
total_steps = 100
for step in range(total_steps):
# Обновляем прогресс
self.update_state(
state="PROGRESS",
meta={"current": step, "total": total_steps}
)
time.sleep(0.5) # Имитируем работу
return {"status": "done", "result": "Processing complete"}
except Exception as exc:
self.update_state(
state="FAILURE",
meta={"error": str(exc)}
)
raise
@app.post("/api/v1/process")
async def start_processing(file_path: str):
"""Запустить долгую обработку"""
# Запускаем задачу в фоне
task = process_large_file.delay(file_path)
return {
"task_id": task.id,
"status": "processing",
"check_url": f"/api/v1/process/{task.id}"
}
@app.get("/api/v1/process/{task_id}")
async def get_processing_status(task_id: str):
"""Проверить статус задачи"""
result = AsyncResult(task_id, app=celery_app)
if result.state == "PROGRESS":
return {
"task_id": task_id,
"status": "processing",
"progress": result.info
}
elif result.state == "SUCCESS":
return {
"task_id": task_id,
"status": "complete",
"result": result.result
}
elif result.state == "FAILURE":
return {
"task_id": task_id,
"status": "failed",
"error": str(result.info)
}
else:
return {
"task_id": task_id,
"status": result.state
}
2. WebSocket для реал-тайм обновлений
Вместо долгого ожидания, отправляем обновления по мере выполнения
from fastapi import WebSocket
import asyncio
import json
@app.websocket("/ws/process")
async def websocket_process(websocket: WebSocket):
await websocket.accept()
try:
# Получаем параметры от клиента
data = await websocket.receive_text()
params = json.loads(data)
# Запускаем долгую операцию
await process_with_progress(websocket, params)
except Exception as e:
await websocket.send_json({"error": str(e)})
finally:
await websocket.close()
async def process_with_progress(websocket: WebSocket, params: dict):
"""Обработка с отправкой прогресса через WebSocket"""
total_steps = 100
for step in range(total_steps):
# Отправляем прогресс в реал-тайм
await websocket.send_json({
"status": "processing",
"current": step,
"total": total_steps,
"percentage": (step / total_steps) * 100
})
# Имитируем работу
await asyncio.sleep(0.5)
# Отправляем финальный результат
await websocket.send_json({
"status": "complete",
"result": "Processing finished"
})
3. Streaming Response для больших данных
Вместо загрузки всего в память, отправляем потоком
from fastapi.responses import StreamingResponse
import io
async def generate_large_file():
"""Генератор для потоковой передачи"""
buffer_size = 1024 * 1024 # 1 MB в буфере
for i in range(1000): # 1000 чанков
# Генерируем часть данных
chunk = f"Line {i}: " + "x" * 1000 + "\n"
yield chunk.encode()
# Даём возможность другим обработаться
await asyncio.sleep(0.01)
@app.get("/api/v1/download")
async def download_large_file():
"""Потоковая передача больших файлов"""
return StreamingResponse(
generate_large_file(),
media_type="text/plain",
headers={
"Content-Disposition": "attachment; filename=large_file.txt"
}
)
4. Job Queue с приоритетами
Для критичных задач используем приоритет в очереди
from enum import Enum
class Priority(str, Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
CRITICAL = "critical"
@app.post("/api/v1/heavy-task")
async def submit_heavy_task(priority: Priority = Priority.NORMAL):
"""Отправить задачу с приоритетом"""
task = process_large_file.apply_async(
args=("file.txt",),
priority=10 if priority == Priority.CRITICAL else 5,
expires=3600 # Задача истекает через час
)
return {"task_id": task.id, "priority": priority}
5. Пакетная обработка (Batch Processing)
Обработка больших наборов данных по частям
from typing import List, Generator
def batch_processor(items: List[any], batch_size: int = 100) -> Generator:
"""Разбираем на батчи для обработки"""
for i in range(0, len(items), batch_size):
yield items[i:i + batch_size]
@celery_app.task
def process_bulk_data(items_ids: List[int]):
"""Обработать список ID по батчам"""
results = []
for batch in batch_processor(items_ids, batch_size=1000):
# Обрабатываем батч
batch_results = process_batch(batch)
results.extend(batch_results)
# Даём БД время на восстановление
time.sleep(1)
return results
@app.post("/api/v1/bulk-process")
async def bulk_process(items_ids: List[int]):
"""Запустить пакетную обработку"""
task = process_bulk_data.delay(items_ids)
return {"task_id": task.id}
6. Кэширование результатов
Не повторяем дорогие операции
from functools import lru_cache
import hashlib
from datetime import datetime, timedelta
results_cache = {}
def cache_key(params: dict) -> str:
"""Генерируем ключ кэша из параметров"""
return hashlib.md5(str(params).encode()).hexdigest()
async def expensive_calculation(params: dict):
"""Дорогая операция с кэшированием"""
key = cache_key(params)
# Проверяем кэш
if key in results_cache:
cached, timestamp = results_cache[key]
if datetime.now() - timestamp < timedelta(hours=1):
return cached
# Выполняем операцию
result = await perform_heavy_calculation(params)
# Сохраняем в кэш
results_cache[key] = (result, datetime.now())
return result
@app.post("/api/v1/calculate")
async def calculate(params: dict):
"""Расчёт с кэшированием результатов"""
result = await expensive_calculation(params)
return {"result": result, "cached": cache_key(params) in results_cache}
7. Timeout и graceful degradation
Ставим лимиты времени выполнения
import asyncio
from fastapi import HTTPException
async def timeout_handler(coro, timeout_seconds: int):
"""Обработка с таймаутом"""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
# Возвращаем частичный результат
return {"status": "partial", "message": "Operation timed out"}
@app.post("/api/v1/fast-process")
async def fast_process(data: dict):
"""Обработка с таймаутом 30 секунд"""
try:
result = await timeout_handler(
process_data(data),
timeout_seconds=30
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
8. Мониторинг длительных операций
import time
from datetime import datetime
class OperationMonitor:
def __init__(self, operation_name: str, max_duration: int = 300):
self.operation_name = operation_name
self.max_duration = max_duration
self.start_time = None
async def __aenter__(self):
self.start_time = time.time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
duration = time.time() - self.start_time
if duration > self.max_duration:
logger.warning(
f"Operation {self.operation_name} took {duration}s (max: {self.max_duration}s)"
)
logger.info(f"Operation {self.operation_name} completed in {duration}s")
# Использование
async with OperationMonitor("file_processing", max_duration=60):
await process_large_file()
Стратегии по сложности
| Проблема | Решение | Когда использовать |
|---|---|---|
| Запрос занимает >5 сек | Celery + background task | Асинхронные операции |
| Нужны обновления в реал-тайм | WebSocket | Потоковые данные |
| Большой объём данных | Streaming Response | Download файлов |
| Батч операции | Queue с батчами | Пакетная обработка |
| Повторяющиеся расчёты | Кэширование | Дорогие операции |
| Критичное время | Timeout + fallback | SLA гарантии |
Вывод
Для долгоживущих операций:
- Запустить в фоне (Celery)
- Вернуть ID для отслеживания
- Предоставить способ проверки статуса (WebSocket или polling)
- Кэшировать результаты когда возможно
- Мониторить время выполнения и устанавливать таймауты
- Использовать потоковую передачу для больших данных
Это обеспечивает отзывчивый API и хороший UX для пользователей.