Какие инструменты используешь для работы с нагрузкой?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Инструменты для работы с нагрузкой
Для обработки высоких нагрузок нужны инструменты на разных уровнях: мониторинг, профилирование, нагрузочное тестирование и оптимизация.
Мониторинг производительности
Prometheus + Grafana
Prometheus собирает метрики, Grafana визуализирует их.
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import FastAPI, Response
app = FastAPI()
# Метрика: количество запросов
request_count = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
# Метрика: время обработки
request_duration = Histogram(
"http_request_duration_seconds",
"HTTP request duration",
["method", "endpoint"],
buckets=[0.1, 0.5, 1.0, 2.0]
)
@app.get("/users")
async def get_users():
with request_duration.labels(method="GET", endpoint="/users").time():
# Обработка запроса
users = [{"id": 1, "name": "John"}]
request_count.labels(
method="GET",
endpoint="/users",
status="200"
).inc()
return users
@app.get("/metrics")
async def metrics():
return Response(
generate_latest(),
media_type="text/plain"
)
APM (Application Performance Monitoring)
New Relic, DataDog, Elastic APM — отслеживают каждый запрос:
from newrelic.agent import initialize, wsgi_application
initialize(config_file="newrelic.ini")
app_with_apm = wsgi_application()(app)
Django Debug Toolbar (для разработки)
Показывает SQL запросы, время обработки, использование памяти:
if DEBUG:
import debug_toolbar
app.include_router(debug_toolbar)
Нагрузочное тестирование
Apache JMeter
Графический инструмент для создания сценариев нагрузочного тестирования.
jmeter -n -t test_plan.jmx -l results.jtl -j jmeter.log
Locust
Питоновский инструмент для нагрузочного тестирования:
from locust import HttpUser, task, between
class UserBehavior(HttpUser):
wait_time = between(1, 3) # Ждём 1-3 сек между запросами
@task(3)
def get_users(self):
self.client.get("/api/users")
@task(1)
def create_user(self):
self.client.post("/api/users", json={"name": "Test"})
Запуск:
locust -f locustfile.py --host=http://localhost:8000
Открывается веб-интерфейс на localhost:8089, где можно увеличивать нагрузку и смотреть результаты.
Apache Bench (ab)
Простой инструмент командной строки:
# 1000 запросов с 100 одновременными
ab -n 1000 -c 100 http://localhost:8000/api/users
wrk
Современный инструмент для нагрузочного тестирования:
# 4 потока, 100 соединений, 10 секунд
wrk -t4 -c100 -d10s http://localhost:8000/api/users
Профилирование кода
cProfile (встроенный)
Показывает время в каждой функции:
import cProfile
import pstats
from io import StringIO
def expensive_function():
result = sum(range(1000000))
return result
if __name__ == "__main__":
profiler = cProfile.Profile()
profiler.enable()
expensive_function()
profiler.disable()
stats = pstats.Stats(profiler, stream=StringIO())
stats.sort_stats("cumulative")
stats.print_stats()
line_profiler
Профилирует по строкам (очень детально):
@profile
def slow_function():
result = 0
for i in range(1000000):
result += i # Эта строка медленная
return result
if __name__ == "__main__":
slow_function()
Запуск:
kernprof -l -v my_script.py
memory_profiler
Трасирует использование памяти:
from memory_profiler import profile
@profile
def memory_intensive():
data = [i ** 2 for i in range(1000000)] # Выделяет много памяти
return sum(data)
if __name__ == "__main__":
memory_intensive()
Запуск:
python -m memory_profiler my_script.py
py-spy
Профилер на уровне ОС, не требует изменений кода:
# Профилирует процесс в течение 10 секунд
py-spy record -o profile.svg --pid 12345 -- python my_script.py
Оптимизация БД
EXPLAIN (анализ запросов)
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def analyze_query(session: AsyncSession):
# PostgreSQL
result = await session.execute(
text("""
EXPLAIN ANALYZE
SELECT u.*, p.* FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.status = active
""")
)
for row in result:
print(row[0]) # Показывает план выполнения и время
Индексы
from sqlalchemy import Index, Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, nullable=False)
status = Column(String, nullable=False)
# Индекс на часто используемые поля
__table_args__ = (
Index("idx_email", "email"),
Index("idx_status", "status"),
Index("idx_email_status", "email", "status"), # Составной
)
Кэширование
Redis
Высокоскоростное хранилище для кэша:
from redis import Redis
from fastapi import FastAPI
import json
app = FastAPI()
redis = Redis(host="localhost", port=6379, decode_responses=True)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Проверяем кэш
cached = redis.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Получаем из БД
user = {"id": user_id, "name": "John"}
# Кэшируем на 1 час
redis.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
Memcached
Проще Redis, только для кэша:
from pymemcache.client.base import PooledClient
memcache = PooledClient("localhost", 11211)
user_data = memcache.get("user:123")
if not user_data:
# Получаем и кэшируем
memcache.set("user:123", user_json, expire=3600)
Load Balancing
Nginx
Распределяет нагрузку между несколькими воркерами:
upstream api {
server localhost:8001;
server localhost:8002;
server localhost:8003;
server localhost:8004;
}
server {
listen 80;
location / {
proxy_pass http://api; # Round-robin по умолчанию
}
}
HAProxy
Продвинутый load balancer с health checks:
backend api
balance roundrobin # или leastconn, source
server api1 localhost:8001 check
server api2 localhost:8002 check
server api3 localhost:8003 check
Асинхронные очереди
Celery + Redis
Для асинхронных задач, снимающих нагрузку с основного приложения:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(email: str):
# Тяжёлая операция
smtp.send_message(email)
return f"Email sent to {email}"
# В основном приложении
from fastapi import FastAPI
web_app = FastAPI()
@web_app.post("/send-email")
async def send_email_async(email: str):
# Не ждём, просто отправляем в очередь
send_email.delay(email)
return {"status": "Email queued"}
RabbitMQ
Альтернатива Redis для очередей:
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.queue_declare(queue="emails", durable=True)
# Отправка
channel.basic_publish(
exchange="",
routing_key="emails",
body="john@example.com"
)
# Потребление
def callback(ch, method, properties, body):
send_email(body.decode())
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue="emails",
on_message_callback=callback
)
channel.start_consuming()
Общая архитектура для высоких нагрузок
Internet
↓
[Nginx Load Balancer]
↓ (распределяет)
┌──────┬──────┬──────┐
API-1 API-2 API-3 (FastAPI workers)
↓
[Connection Pool]
↓
[PostgreSQL]
↓
[Реплики для чтения]
[Redis Cache]
[Elasticsearch для логов]
[Prometheus + Grafana]
Стратегия оптимизации
- Мониторь — используй Prometheus, APM
- Найди узкое место — профилирование, EXPLAIN
- Оптимизируй — кэш, индексы, асинхронность
- Нагрузочно тести — Locust, JMeter
- Масштабируй — добавляй workers, репликируй БД
- Повторяй — монитори результаты оптимизации