← Назад к вопросам

Как решишь проблему задач, требующих много времени?

1.3 Junior🔥 141 комментариев
#Soft Skills#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Решение задач, требующих много времени

Долгоживущие операции — это обычная проблема в backend разработке. API запрос ждёт 30 секунд? Длительная обработка может заблокировать других пользователей. Я использую несколько проверенных подходов.

1. Асинхронная обработка с Celery

Проблема: Длительная операция блокирует обработку запроса Решение: Запустить в фоне, вернуть клиенту ID задачи

from celery import Celery
from celery.result import AsyncResult
import time
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task(bind=True)
def process_large_file(self, file_path: str):
    """Долгая обработка файла"""
    try:
        total_steps = 100
        for step in range(total_steps):
            # Обновляем прогресс
            self.update_state(
                state="PROGRESS",
                meta={"current": step, "total": total_steps}
            )
            time.sleep(0.5)  # Имитируем работу
        
        return {"status": "done", "result": "Processing complete"}
    except Exception as exc:
        self.update_state(
            state="FAILURE",
            meta={"error": str(exc)}
        )
        raise

@app.post("/api/v1/process")
async def start_processing(file_path: str):
    """Запустить долгую обработку"""
    # Запускаем задачу в фоне
    task = process_large_file.delay(file_path)
    
    return {
        "task_id": task.id,
        "status": "processing",
        "check_url": f"/api/v1/process/{task.id}"
    }

@app.get("/api/v1/process/{task_id}")
async def get_processing_status(task_id: str):
    """Проверить статус задачи"""
    result = AsyncResult(task_id, app=celery_app)
    
    if result.state == "PROGRESS":
        return {
            "task_id": task_id,
            "status": "processing",
            "progress": result.info
        }
    elif result.state == "SUCCESS":
        return {
            "task_id": task_id,
            "status": "complete",
            "result": result.result
        }
    elif result.state == "FAILURE":
        return {
            "task_id": task_id,
            "status": "failed",
            "error": str(result.info)
        }
    else:
        return {
            "task_id": task_id,
            "status": result.state
        }

2. WebSocket для реал-тайм обновлений

Вместо долгого ожидания, отправляем обновления по мере выполнения

from fastapi import WebSocket
import asyncio
import json

@app.websocket("/ws/process")
async def websocket_process(websocket: WebSocket):
    await websocket.accept()
    
    try:
        # Получаем параметры от клиента
        data = await websocket.receive_text()
        params = json.loads(data)
        
        # Запускаем долгую операцию
        await process_with_progress(websocket, params)
    except Exception as e:
        await websocket.send_json({"error": str(e)})
    finally:
        await websocket.close()

async def process_with_progress(websocket: WebSocket, params: dict):
    """Обработка с отправкой прогресса через WebSocket"""
    total_steps = 100
    
    for step in range(total_steps):
        # Отправляем прогресс в реал-тайм
        await websocket.send_json({
            "status": "processing",
            "current": step,
            "total": total_steps,
            "percentage": (step / total_steps) * 100
        })
        
        # Имитируем работу
        await asyncio.sleep(0.5)
    
    # Отправляем финальный результат
    await websocket.send_json({
        "status": "complete",
        "result": "Processing finished"
    })

3. Streaming Response для больших данных

Вместо загрузки всего в память, отправляем потоком

from fastapi.responses import StreamingResponse
import io

async def generate_large_file():
    """Генератор для потоковой передачи"""
    buffer_size = 1024 * 1024  # 1 MB в буфере
    
    for i in range(1000):  # 1000 чанков
        # Генерируем часть данных
        chunk = f"Line {i}: " + "x" * 1000 + "\n"
        yield chunk.encode()
        
        # Даём возможность другим обработаться
        await asyncio.sleep(0.01)

@app.get("/api/v1/download")
async def download_large_file():
    """Потоковая передача больших файлов"""
    return StreamingResponse(
        generate_large_file(),
        media_type="text/plain",
        headers={
            "Content-Disposition": "attachment; filename=large_file.txt"
        }
    )

4. Job Queue с приоритетами

Для критичных задач используем приоритет в очереди

from enum import Enum

class Priority(str, Enum):
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    CRITICAL = "critical"

@app.post("/api/v1/heavy-task")
async def submit_heavy_task(priority: Priority = Priority.NORMAL):
    """Отправить задачу с приоритетом"""
    task = process_large_file.apply_async(
        args=("file.txt",),
        priority=10 if priority == Priority.CRITICAL else 5,
        expires=3600  # Задача истекает через час
    )
    
    return {"task_id": task.id, "priority": priority}

5. Пакетная обработка (Batch Processing)

Обработка больших наборов данных по частям

from typing import List, Generator

def batch_processor(items: List[any], batch_size: int = 100) -> Generator:
    """Разбираем на батчи для обработки"""
    for i in range(0, len(items), batch_size):
        yield items[i:i + batch_size]

@celery_app.task
def process_bulk_data(items_ids: List[int]):
    """Обработать список ID по батчам"""
    results = []
    
    for batch in batch_processor(items_ids, batch_size=1000):
        # Обрабатываем батч
        batch_results = process_batch(batch)
        results.extend(batch_results)
        
        # Даём БД время на восстановление
        time.sleep(1)
    
    return results

@app.post("/api/v1/bulk-process")
async def bulk_process(items_ids: List[int]):
    """Запустить пакетную обработку"""
    task = process_bulk_data.delay(items_ids)
    return {"task_id": task.id}

6. Кэширование результатов

Не повторяем дорогие операции

from functools import lru_cache
import hashlib
from datetime import datetime, timedelta

results_cache = {}

def cache_key(params: dict) -> str:
    """Генерируем ключ кэша из параметров"""
    return hashlib.md5(str(params).encode()).hexdigest()

async def expensive_calculation(params: dict):
    """Дорогая операция с кэшированием"""
    key = cache_key(params)
    
    # Проверяем кэш
    if key in results_cache:
        cached, timestamp = results_cache[key]
        if datetime.now() - timestamp < timedelta(hours=1):
            return cached
    
    # Выполняем операцию
    result = await perform_heavy_calculation(params)
    
    # Сохраняем в кэш
    results_cache[key] = (result, datetime.now())
    return result

@app.post("/api/v1/calculate")
async def calculate(params: dict):
    """Расчёт с кэшированием результатов"""
    result = await expensive_calculation(params)
    return {"result": result, "cached": cache_key(params) in results_cache}

7. Timeout и graceful degradation

Ставим лимиты времени выполнения

import asyncio
from fastapi import HTTPException

async def timeout_handler(coro, timeout_seconds: int):
    """Обработка с таймаутом"""
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        # Возвращаем частичный результат
        return {"status": "partial", "message": "Operation timed out"}

@app.post("/api/v1/fast-process")
async def fast_process(data: dict):
    """Обработка с таймаутом 30 секунд"""
    try:
        result = await timeout_handler(
            process_data(data),
            timeout_seconds=30
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

8. Мониторинг длительных операций

import time
from datetime import datetime

class OperationMonitor:
    def __init__(self, operation_name: str, max_duration: int = 300):
        self.operation_name = operation_name
        self.max_duration = max_duration
        self.start_time = None
    
    async def __aenter__(self):
        self.start_time = time.time()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        duration = time.time() - self.start_time
        
        if duration > self.max_duration:
            logger.warning(
                f"Operation {self.operation_name} took {duration}s (max: {self.max_duration}s)"
            )
        
        logger.info(f"Operation {self.operation_name} completed in {duration}s")

# Использование
async with OperationMonitor("file_processing", max_duration=60):
    await process_large_file()

Стратегии по сложности

ПроблемаРешениеКогда использовать
Запрос занимает >5 секCelery + background taskАсинхронные операции
Нужны обновления в реал-таймWebSocketПотоковые данные
Большой объём данныхStreaming ResponseDownload файлов
Батч операцииQueue с батчамиПакетная обработка
Повторяющиеся расчётыКэшированиеДорогие операции
Критичное времяTimeout + fallbackSLA гарантии

Вывод

Для долгоживущих операций:

  1. Запустить в фоне (Celery)
  2. Вернуть ID для отслеживания
  3. Предоставить способ проверки статуса (WebSocket или polling)
  4. Кэшировать результаты когда возможно
  5. Мониторить время выполнения и устанавливать таймауты
  6. Использовать потоковую передачу для больших данных

Это обеспечивает отзывчивый API и хороший UX для пользователей.

Как решишь проблему задач, требующих много времени? | PrepBro