← Назад к вопросам

Что делать, если уперся в потолок по производительности?

1.0 Junior🔥 261 комментариев
#DevOps и инфраструктура

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Оптимизация при достижении потолка производительности

Это частая проблема в production системах. Есть систематический подход к её решению.

Шаг 1: Профилирование (это обязательно)

Прежде всего, найди узкое место. Без данных ты просто гадаешь:

import cProfile
import pstats
from io import StringIO

# Способ 1: Встроенный профайлер
profile = cProfile.Profile()
profile.enable()

# Твой код здесь
result = expensive_function()

profile.disable()
stats = pstats.Stats(profile)
stats.sort_stats('cumulative')
stats.print_stats(10)  # Топ 10 функций
# Способ 2: line_profiler (по строкам кода)
from line_profiler import LineProfiler

def expensive_function():
    data = []
    for i in range(1000000):
        data.append(i * 2)  # Медленная строка
    return sum(data)

lpr = LineProfiler()
lpr_wrapper = lpr(expensive_function)
result = lpr_wrapper()
lpr.print_stats()
# Способ 3: memory_profiler (использование памяти)
from memory_profiler import profile

@profile
def expensive_function():
    data = list(range(1000000))
    return sum(data)

expensive_function()

Шаг 2: Анализ результатов профилирования

Обычно узкие места в одном из этих мест:

┌─ CPU bound (код выполняется долго)
│  ├─ Алгоритмическая сложность O(n²) вместо O(n)
│  ├─ Много вычислений в Python loop
│  └─ Отсутствие кэширования
│
├─ I/O bound (ждём ответа от сети/диска)
│  ├─ Медленная БД (неправильные запросы)
│  ├─ Медленные API вызовы
│  └─ Блокирующие операции ввода-вывода
│
└─ Memory (много памяти)
   ├─ Утечки памяти
   ├─ Слишком большие структуры в памяти
   └─ Неэффективные алгоритмы

Решение 1: Оптимизация алгоритма

Это часто самое эффективное:

# ❌ Медленно: O(n²) сложность
def slow_duplicates(arr):
    for i in range(len(arr)):
        for j in range(i + 1, len(arr)):
            if arr[i] == arr[j]:
                return True
    return False

# ✅ Быстро: O(n) сложность
def fast_duplicates(arr):
    seen = set()
    for num in arr:
        if num in seen:
            return True
        seen.add(num)
    return False

# Бенчмарк
import timeit
data = list(range(10000)) + [9999]

print(timeit.timeit(lambda: slow_duplicates(data), number=1))
# 0.8s
print(timeit.timeit(lambda: fast_duplicates(data), number=1))
# 0.001s

Решение 2: Кэширование

Используй кэш для часто запрашиваемых данных:

from functools import lru_cache
import time

# Без кэша
def fibonacci(n):
    if n <= 1:
        return n
    time.sleep(0.001)  # имитация вычисления
    return fibonacci(n-1) + fibonacci(n-2)

# С кэшем
@lru_cache(maxsize=128)
def fibonacci_cached(n):
    if n <= 1:
        return n
    time.sleep(0.001)
    return fibonacci_cached(n-1) + fibonacci_cached(n-2)

# Бенчмарк
import timeit
print("Without cache:", timeit.timeit(lambda: fibonacci(20), number=1))
# 10+ seconds
print("With cache:", timeit.timeit(lambda: fibonacci_cached(20), number=1))
# 0.04 seconds

Решение 3: Оптимизация БД запросов

Самая частая причина медленности:

# ❌ N+1 проблема (100 запросов вместо 1)
orders = Order.query.all()
for order in orders:
    print(order.customer.name)  # SELECT для каждого заказа

# ✅ Eager loading (1 запрос)
orders = Order.query.options(
    joinedload(Order.customer)
).all()
for order in orders:
    print(order.customer.name)  # нет дополнительных запросов
# ❌ Медленный запрос
result = session.query(User).filter(
    User.email.like('%example%')
).all()

# ✅ С индексом
# CREATE INDEX idx_email ON users (email);
# Поиск в 1000x быстрее

Решение 4: Асинхронность для I/O

Для сетевых операций:

# ❌ Синхронно (10 запросов = 10 секунд)
import requests

def fetch_urls(urls):
    results = []
    for url in urls:
        response = requests.get(url)  # 1 сек каждый
        results.append(response.json())
    return results

# ✅ Асинхронно (10 запросов = 1 сек)
import asyncio
import aiohttp

async def fetch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [await r.json() for r in responses]

# Использование
results = asyncio.run(fetch_urls(urls))

Решение 5: Кэширование на уровне приложения

Redis для распределённого кэша:

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379)

def get_user_data(user_id: int):
    # Проверяем кэш
    cached = redis_client.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    # Если нет в кэше, получаем из БД
    user = User.query.get(user_id)
    data = user.to_dict()
    
    # Сохраняем в кэш на 1 час
    redis_client.setex(
        f"user:{user_id}",
        3600,
        json.dumps(data)
    )
    return data

Решение 6: Масштабирование горизонтальное

Добавь больше серверов:

# Балансировка нагрузки (Nginx)
upstream backend {
    server app1:8000;
    server app2:8000;
    server app3:8000;
}

server {
    listen 80;
    location / {
        proxy_pass http://backend;
    }
}

Решение 7: Асинхронный worker

Жёлтые задачи в фоне:

from celery import Celery

app = Celery('tasks')

# Долгая задача
@app.task
def process_data(data):
    # Обработка в фоне
    result = heavy_computation(data)
    save_to_db(result)

# Использование
from fastapi import FastAPI

api = FastAPI()

@api.post("/process")
async def start_process(data: dict):
    # Запускаем в фоне
    task = process_data.delay(data)
    return {"task_id": task.id}

@api.get("/status/{task_id}")
async def check_status(task_id: str):
    from celery.result import AsyncResult
    task = AsyncResult(task_id)
    return {"status": task.status, "result": task.result}

Решение 8: Код на C/C++

Для критичных функций используй расширения:

# fibonacci.pyx (Cython)
cpdef int fibonacci(int n):
    if n <= 1:
        return n
    cdef int a = 0, b = 1
    for _ in range(n - 1):
        a, b = b, a + b
    return b

Решение 9: CDN и edge computing

Для статического контента:

# Используй CloudFlare, AWS CloudFront для кэширования
# Распределяй вычисления на edge (Cloudflare Workers)

Полный чеклист оптимизации

  1. Профилируй (cProfile, line_profiler)
  2. Улучши алгоритм (O(n) вместо O(n²))
  3. Добавь индексы БД (самый быстрый способ)
  4. Оптимизируй запросы (N+1, eager loading)
  5. Добавь кэширование (lru_cache, Redis)
  6. Используй асинхронность (asyncio для I/O)
  7. Масштабируй горизонтально (больше серверов)
  8. Оптимизируй фронт (CDN, код сжатие)
  9. Мониторь (собирай метрики, логи)

Реальный пример: оптимизация эндпоинта

from fastapi import FastAPI
from functools import lru_cache
import time

app = FastAPI()

# БЫЛО: 10 сек на запрос
@app.get("/slow")
def slow_endpoint(user_id: int):
    # N+1: берем юзера
    user = User.query.get(user_id)
    # потом его заказы
    orders = Order.query.filter_by(user_id=user_id).all()
    # потом каждого заказа товары
    for order in orders:
        items = Item.query.filter_by(order_id=order.id).all()
    return {"user": user, "orders": orders}

# СТАЛО: 0.1 сек на запрос (100x улучшение)
@app.get("/fast")
@lru_cache(maxsize=100)
def fast_endpoint(user_id: int):
    # Один запрос с eager loading
    user = User.query.options(
        joinedload(User.orders).joinedload(Order.items)
    ).get(user_id)
    return user.to_dict()

Главное: не гадай, профилируй!