← Назад к вопросам

Как обеспечить стабильное время отклика при высокой нагрузке?

2.8 Senior🔥 161 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как обеспечить стабильное время отклика при высокой нагрузке

Это одна из самых частых проблем в production. Я расскажу про многоуровневый подход к стабильности, основанный на 10+ годах опыта.

1. Профилирование: поймём узкие места

Инструмент cProfile:

import cProfile
import pstats

def slow_function():
    total = 0
    for i in range(1000000):
        total += i ** 2
    return total

# Профилируем
profiler = cProfile.Profile()
profiler.enable()

result = slow_function()

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10)  # Топ 10 функций

Инструмент py-spy (production ready):

# Снимаем flame graph боевого приложения
py-spy record -o profile.svg -- python app.py

# Смотрим результат в браузере
open profile.svg

2. Кэширование на разных уровнях

Level 1: In-Memory кэш (Redis)

from redis import Redis
from json import dumps, loads
from functools import wraps
import time

redis_client = Redis(host='localhost', port=6379, db=0)

def cache_result(ttl_seconds=3600):
    """Декоратор для кэширования в Redis."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Генерируем ключ
            cache_key = f"{func.__name__}:{args}:{kwargs}"
            
            # Проверяем кэш
            cached = redis_client.get(cache_key)
            if cached:
                return loads(cached)
            
            # Вычисляем результат
            result = func(*args, **kwargs)
            
            # Кэшируем
            redis_client.setex(
                cache_key,
                ttl_seconds,
                dumps(result)
            )
            
            return result
        return wrapper
    return decorator

@cache_result(ttl_seconds=300)
def get_user_profile(user_id: int) -> dict:
    """Получает профиль пользователя из БД."""
    # Дорогой запрос к БД
    return {"id": user_id, "name": "User"}

Level 2: Query-level кэш (SQLAlchemy)

from sqlalchemy.orm import Query
from sqlalchemy import event
from cachetools import TTLCache

query_cache = TTLCache(maxsize=1000, ttl=300)

@event.listens_for(Query, "before_bulk_update")
def receive_before_update(update_context):
    """Инвалидируем кэш при обновлении."""
    query_cache.clear()

def cached_query(query: Query, cache_key: str = None):
    """Оборачивает Query с кэшированием."""
    if cache_key is None:
        cache_key = str(query)
    
    if cache_key in query_cache:
        return query_cache[cache_key]
    
    result = query.all()
    query_cache[cache_key] = result
    return result

# Использование
users = cached_query(session.query(User).filter(User.active == True))

3. Database оптимизация

Индексы на часто-запрашиваемых полях:

from sqlalchemy import Column, String, Index
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    email = Column(String(255), unique=True, index=True)
    status = Column(String(50), index=True)
    created_at = Column(DateTime, index=True)
    
    # Составной индекс для частых фильтров
    __table_args__ = (
        Index('idx_status_created', 'status', 'created_at'),
    )

N+1 problem (join fetch):

# ❌ Плохо: много запросов
users = session.query(User).all()
for user in users:
    print(user.posts)  # Отдельный запрос для каждого пользователя!

# ✅ Хорошо: один запрос с join
from sqlalchemy.orm import joinedload

users = session.query(User).options(
    joinedload(User.posts)
).all()

Connection pooling:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'postgresql://user:password@localhost/db',
    poolclass=QueuePool,
    pool_size=20,
    max_overflow=40,
    pool_recycle=3600,
    pool_pre_ping=True  # Проверяет живо ли соединение
)

4. Асинхронная обработка

AsyncIO для I/O операций:

import asyncio
from aiohttp import ClientSession
from typing import List

async def fetch_urls(urls: List[str]) -> List[str]:
    """Параллельно получает данные с нескольких URL."""
    async with ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [await r.text() for r in responses]

# Вместо последовательного (медленно)
for url in urls:
    response = requests.get(url)  # 3 запроса = 3 секунды

# Используем async (быстро)
results = asyncio.run(fetch_urls(urls))  # 3 запроса = 1 секунда

Celery для long-running tasks:

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_large_report(self, report_id: int):
    """Обрабатывает отчёт асинхронно."""
    try:
        report = Report.query.get(report_id)
        # Дорогая обработка
        time.sleep(30)
        report.status = 'completed'
        db.session.commit()
    except Exception as exc:
        # Retry с экспоненциальной backoff
        self.retry(exc=exc, countdown=2 ** self.request.retries)

# В API просто отправляем в очередь
@app.route('/reports')
def create_report():
    report = Report()
    db.session.add(report)
    db.session.commit()
    
    # Обработка произойдёт в фоне
    process_large_report.delay(report.id)
    
    return {"report_id": report.id, "status": "processing"}

5. Rate limiting и backpressure

Token bucket алгоритм:

from ratelimit import limits, sleep_and_retry
import time

@sleep_and_retry
@limits(calls=100, period=60)  # 100 запросов в минуту
def api_endpoint():
    return {"data": "result"}

# Или с Redis для распределённых систем
from redis_rate_limit import RedisRateLimiter

limiter = RedisRateLimiter(
    redis_client,
    key_prefix='api:endpoint:',
    rate=100,  # запросов
    per=60,  # в течение 60 секунд
)

from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/api/data")
async def get_data(user_id: int):
    if not limiter.is_allowed(f"user:{user_id}"):
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded"}
        )
    return {"data": "result"}

6. Circuit breaker паттерн

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"  # Запросы проходят
    OPEN = "open"  # Запросы блокируются
    HALF_OPEN = "half_open"  # Пробуем восстановление

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

# Использование
breaker = CircuitBreaker()

try:
    result = breaker.call(external_api.fetch_data)
except Exception:
    # Используем fallback данные
    result = get_cached_fallback()

7. Мониторинг и алерты

Prometheus метрики:

from prometheus_client import Counter, Histogram, Gauge

# Счётчик запросов
request_count = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

# Гистограмма времени отклика
request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    buckets=(0.1, 0.5, 1.0, 2.0, 5.0)
)

# Gauge текущих соединений
active_connections = Gauge(
    'active_db_connections',
    'Number of active database connections'
)

# Использование в FastAPI
from fastapi import FastAPI
from prometheus_client import make_asgi_app
import time

app = FastAPI()

@app.middleware("http")
async def track_metrics(request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    
    request_count.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    request_duration.observe(duration)
    
    return response

# Expose метрики на /metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

8. Рекомендуемый стек для стабильности

  1. Профилирование → py-spy
  2. Кэширование → Redis
  3. Database → Connection pooling + индексы
  4. Async → asyncio + aiohttp
  5. Long tasks → Celery
  6. Rate limiting → Redis-based
  7. Resilience → Circuit breaker
  8. Monitoring → Prometheus + Grafana

Вывод

Стабильность под нагрузкой достигается комбинацией:

  • Правильной архитектуры (async, queue workers)
  • Эффективного использования ресурсов (кэш, индексы)
  • Graceful degradation (circuit breaker, fallbacks)
  • Постоянного мониторинга (метрики, алерты)

Нет универсального решения — нужно профилировать именно ваше приложение.

Как обеспечить стабильное время отклика при высокой нагрузке? | PrepBro