← Назад к вопросам
Как обеспечить стабильное время отклика при высокой нагрузке?
2.8 Senior🔥 161 комментариев
#DevOps и инфраструктура#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как обеспечить стабильное время отклика при высокой нагрузке
Это одна из самых частых проблем в production. Я расскажу про многоуровневый подход к стабильности, основанный на 10+ годах опыта.
1. Профилирование: поймём узкие места
Инструмент cProfile:
import cProfile
import pstats
def slow_function():
total = 0
for i in range(1000000):
total += i ** 2
return total
# Профилируем
profiler = cProfile.Profile()
profiler.enable()
result = slow_function()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Топ 10 функций
Инструмент py-spy (production ready):
# Снимаем flame graph боевого приложения
py-spy record -o profile.svg -- python app.py
# Смотрим результат в браузере
open profile.svg
2. Кэширование на разных уровнях
Level 1: In-Memory кэш (Redis)
from redis import Redis
from json import dumps, loads
from functools import wraps
import time
redis_client = Redis(host='localhost', port=6379, db=0)
def cache_result(ttl_seconds=3600):
"""Декоратор для кэширования в Redis."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Генерируем ключ
cache_key = f"{func.__name__}:{args}:{kwargs}"
# Проверяем кэш
cached = redis_client.get(cache_key)
if cached:
return loads(cached)
# Вычисляем результат
result = func(*args, **kwargs)
# Кэшируем
redis_client.setex(
cache_key,
ttl_seconds,
dumps(result)
)
return result
return wrapper
return decorator
@cache_result(ttl_seconds=300)
def get_user_profile(user_id: int) -> dict:
"""Получает профиль пользователя из БД."""
# Дорогой запрос к БД
return {"id": user_id, "name": "User"}
Level 2: Query-level кэш (SQLAlchemy)
from sqlalchemy.orm import Query
from sqlalchemy import event
from cachetools import TTLCache
query_cache = TTLCache(maxsize=1000, ttl=300)
@event.listens_for(Query, "before_bulk_update")
def receive_before_update(update_context):
"""Инвалидируем кэш при обновлении."""
query_cache.clear()
def cached_query(query: Query, cache_key: str = None):
"""Оборачивает Query с кэшированием."""
if cache_key is None:
cache_key = str(query)
if cache_key in query_cache:
return query_cache[cache_key]
result = query.all()
query_cache[cache_key] = result
return result
# Использование
users = cached_query(session.query(User).filter(User.active == True))
3. Database оптимизация
Индексы на часто-запрашиваемых полях:
from sqlalchemy import Column, String, Index
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, index=True)
status = Column(String(50), index=True)
created_at = Column(DateTime, index=True)
# Составной индекс для частых фильтров
__table_args__ = (
Index('idx_status_created', 'status', 'created_at'),
)
N+1 problem (join fetch):
# ❌ Плохо: много запросов
users = session.query(User).all()
for user in users:
print(user.posts) # Отдельный запрос для каждого пользователя!
# ✅ Хорошо: один запрос с join
from sqlalchemy.orm import joinedload
users = session.query(User).options(
joinedload(User.posts)
).all()
Connection pooling:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:password@localhost/db',
poolclass=QueuePool,
pool_size=20,
max_overflow=40,
pool_recycle=3600,
pool_pre_ping=True # Проверяет живо ли соединение
)
4. Асинхронная обработка
AsyncIO для I/O операций:
import asyncio
from aiohttp import ClientSession
from typing import List
async def fetch_urls(urls: List[str]) -> List[str]:
"""Параллельно получает данные с нескольких URL."""
async with ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.text() for r in responses]
# Вместо последовательного (медленно)
for url in urls:
response = requests.get(url) # 3 запроса = 3 секунды
# Используем async (быстро)
results = asyncio.run(fetch_urls(urls)) # 3 запроса = 1 секунда
Celery для long-running tasks:
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_large_report(self, report_id: int):
"""Обрабатывает отчёт асинхронно."""
try:
report = Report.query.get(report_id)
# Дорогая обработка
time.sleep(30)
report.status = 'completed'
db.session.commit()
except Exception as exc:
# Retry с экспоненциальной backoff
self.retry(exc=exc, countdown=2 ** self.request.retries)
# В API просто отправляем в очередь
@app.route('/reports')
def create_report():
report = Report()
db.session.add(report)
db.session.commit()
# Обработка произойдёт в фоне
process_large_report.delay(report.id)
return {"report_id": report.id, "status": "processing"}
5. Rate limiting и backpressure
Token bucket алгоритм:
from ratelimit import limits, sleep_and_retry
import time
@sleep_and_retry
@limits(calls=100, period=60) # 100 запросов в минуту
def api_endpoint():
return {"data": "result"}
# Или с Redis для распределённых систем
from redis_rate_limit import RedisRateLimiter
limiter = RedisRateLimiter(
redis_client,
key_prefix='api:endpoint:',
rate=100, # запросов
per=60, # в течение 60 секунд
)
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/api/data")
async def get_data(user_id: int):
if not limiter.is_allowed(f"user:{user_id}"):
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"}
)
return {"data": "result"}
6. Circuit breaker паттерн
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Запросы проходят
OPEN = "open" # Запросы блокируются
HALF_OPEN = "half_open" # Пробуем восстановление
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# Использование
breaker = CircuitBreaker()
try:
result = breaker.call(external_api.fetch_data)
except Exception:
# Используем fallback данные
result = get_cached_fallback()
7. Мониторинг и алерты
Prometheus метрики:
from prometheus_client import Counter, Histogram, Gauge
# Счётчик запросов
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# Гистограмма времени отклика
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
buckets=(0.1, 0.5, 1.0, 2.0, 5.0)
)
# Gauge текущих соединений
active_connections = Gauge(
'active_db_connections',
'Number of active database connections'
)
# Использование в FastAPI
from fastapi import FastAPI
from prometheus_client import make_asgi_app
import time
app = FastAPI()
@app.middleware("http")
async def track_metrics(request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.observe(duration)
return response
# Expose метрики на /metrics
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
8. Рекомендуемый стек для стабильности
- Профилирование → py-spy
- Кэширование → Redis
- Database → Connection pooling + индексы
- Async → asyncio + aiohttp
- Long tasks → Celery
- Rate limiting → Redis-based
- Resilience → Circuit breaker
- Monitoring → Prometheus + Grafana
Вывод
Стабильность под нагрузкой достигается комбинацией:
- Правильной архитектуры (async, queue workers)
- Эффективного использования ресурсов (кэш, индексы)
- Graceful degradation (circuit breaker, fallbacks)
- Постоянного мониторинга (метрики, алерты)
Нет универсального решения — нужно профилировать именно ваше приложение.