← Назад к вопросам
Что делать, если уперся в потолок по производительности?
1.0 Junior🔥 261 комментариев
#DevOps и инфраструктура
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI23 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация при достижении потолка производительности
Это частая проблема в production системах. Есть систематический подход к её решению.
Шаг 1: Профилирование (это обязательно)
Прежде всего, найди узкое место. Без данных ты просто гадаешь:
import cProfile
import pstats
from io import StringIO
# Способ 1: Встроенный профайлер
profile = cProfile.Profile()
profile.enable()
# Твой код здесь
result = expensive_function()
profile.disable()
stats = pstats.Stats(profile)
stats.sort_stats('cumulative')
stats.print_stats(10) # Топ 10 функций
# Способ 2: line_profiler (по строкам кода)
from line_profiler import LineProfiler
def expensive_function():
data = []
for i in range(1000000):
data.append(i * 2) # Медленная строка
return sum(data)
lpr = LineProfiler()
lpr_wrapper = lpr(expensive_function)
result = lpr_wrapper()
lpr.print_stats()
# Способ 3: memory_profiler (использование памяти)
from memory_profiler import profile
@profile
def expensive_function():
data = list(range(1000000))
return sum(data)
expensive_function()
Шаг 2: Анализ результатов профилирования
Обычно узкие места в одном из этих мест:
┌─ CPU bound (код выполняется долго)
│ ├─ Алгоритмическая сложность O(n²) вместо O(n)
│ ├─ Много вычислений в Python loop
│ └─ Отсутствие кэширования
│
├─ I/O bound (ждём ответа от сети/диска)
│ ├─ Медленная БД (неправильные запросы)
│ ├─ Медленные API вызовы
│ └─ Блокирующие операции ввода-вывода
│
└─ Memory (много памяти)
├─ Утечки памяти
├─ Слишком большие структуры в памяти
└─ Неэффективные алгоритмы
Решение 1: Оптимизация алгоритма
Это часто самое эффективное:
# ❌ Медленно: O(n²) сложность
def slow_duplicates(arr):
for i in range(len(arr)):
for j in range(i + 1, len(arr)):
if arr[i] == arr[j]:
return True
return False
# ✅ Быстро: O(n) сложность
def fast_duplicates(arr):
seen = set()
for num in arr:
if num in seen:
return True
seen.add(num)
return False
# Бенчмарк
import timeit
data = list(range(10000)) + [9999]
print(timeit.timeit(lambda: slow_duplicates(data), number=1))
# 0.8s
print(timeit.timeit(lambda: fast_duplicates(data), number=1))
# 0.001s
Решение 2: Кэширование
Используй кэш для часто запрашиваемых данных:
from functools import lru_cache
import time
# Без кэша
def fibonacci(n):
if n <= 1:
return n
time.sleep(0.001) # имитация вычисления
return fibonacci(n-1) + fibonacci(n-2)
# С кэшем
@lru_cache(maxsize=128)
def fibonacci_cached(n):
if n <= 1:
return n
time.sleep(0.001)
return fibonacci_cached(n-1) + fibonacci_cached(n-2)
# Бенчмарк
import timeit
print("Without cache:", timeit.timeit(lambda: fibonacci(20), number=1))
# 10+ seconds
print("With cache:", timeit.timeit(lambda: fibonacci_cached(20), number=1))
# 0.04 seconds
Решение 3: Оптимизация БД запросов
Самая частая причина медленности:
# ❌ N+1 проблема (100 запросов вместо 1)
orders = Order.query.all()
for order in orders:
print(order.customer.name) # SELECT для каждого заказа
# ✅ Eager loading (1 запрос)
orders = Order.query.options(
joinedload(Order.customer)
).all()
for order in orders:
print(order.customer.name) # нет дополнительных запросов
# ❌ Медленный запрос
result = session.query(User).filter(
User.email.like('%example%')
).all()
# ✅ С индексом
# CREATE INDEX idx_email ON users (email);
# Поиск в 1000x быстрее
Решение 4: Асинхронность для I/O
Для сетевых операций:
# ❌ Синхронно (10 запросов = 10 секунд)
import requests
def fetch_urls(urls):
results = []
for url in urls:
response = requests.get(url) # 1 сек каждый
results.append(response.json())
return results
# ✅ Асинхронно (10 запросов = 1 сек)
import asyncio
import aiohttp
async def fetch_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# Использование
results = asyncio.run(fetch_urls(urls))
Решение 5: Кэширование на уровне приложения
Redis для распределённого кэша:
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379)
def get_user_data(user_id: int):
# Проверяем кэш
cached = redis_client.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Если нет в кэше, получаем из БД
user = User.query.get(user_id)
data = user.to_dict()
# Сохраняем в кэш на 1 час
redis_client.setex(
f"user:{user_id}",
3600,
json.dumps(data)
)
return data
Решение 6: Масштабирование горизонтальное
Добавь больше серверов:
# Балансировка нагрузки (Nginx)
upstream backend {
server app1:8000;
server app2:8000;
server app3:8000;
}
server {
listen 80;
location / {
proxy_pass http://backend;
}
}
Решение 7: Асинхронный worker
Жёлтые задачи в фоне:
from celery import Celery
app = Celery('tasks')
# Долгая задача
@app.task
def process_data(data):
# Обработка в фоне
result = heavy_computation(data)
save_to_db(result)
# Использование
from fastapi import FastAPI
api = FastAPI()
@api.post("/process")
async def start_process(data: dict):
# Запускаем в фоне
task = process_data.delay(data)
return {"task_id": task.id}
@api.get("/status/{task_id}")
async def check_status(task_id: str):
from celery.result import AsyncResult
task = AsyncResult(task_id)
return {"status": task.status, "result": task.result}
Решение 8: Код на C/C++
Для критичных функций используй расширения:
# fibonacci.pyx (Cython)
cpdef int fibonacci(int n):
if n <= 1:
return n
cdef int a = 0, b = 1
for _ in range(n - 1):
a, b = b, a + b
return b
Решение 9: CDN и edge computing
Для статического контента:
# Используй CloudFlare, AWS CloudFront для кэширования
# Распределяй вычисления на edge (Cloudflare Workers)
Полный чеклист оптимизации
- Профилируй (cProfile, line_profiler)
- Улучши алгоритм (O(n) вместо O(n²))
- Добавь индексы БД (самый быстрый способ)
- Оптимизируй запросы (N+1, eager loading)
- Добавь кэширование (lru_cache, Redis)
- Используй асинхронность (asyncio для I/O)
- Масштабируй горизонтально (больше серверов)
- Оптимизируй фронт (CDN, код сжатие)
- Мониторь (собирай метрики, логи)
Реальный пример: оптимизация эндпоинта
from fastapi import FastAPI
from functools import lru_cache
import time
app = FastAPI()
# БЫЛО: 10 сек на запрос
@app.get("/slow")
def slow_endpoint(user_id: int):
# N+1: берем юзера
user = User.query.get(user_id)
# потом его заказы
orders = Order.query.filter_by(user_id=user_id).all()
# потом каждого заказа товары
for order in orders:
items = Item.query.filter_by(order_id=order.id).all()
return {"user": user, "orders": orders}
# СТАЛО: 0.1 сек на запрос (100x улучшение)
@app.get("/fast")
@lru_cache(maxsize=100)
def fast_endpoint(user_id: int):
# Один запрос с eager loading
user = User.query.options(
joinedload(User.orders).joinedload(Order.items)
).get(user_id)
return user.to_dict()
Главное: не гадай, профилируй!