← Назад к вопросам

Как реализуется нагрузочное тестирование на практике?

2.0 Middle🔥 91 комментариев
#Тестирование

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как реализуется нагрузочное тестирование на практике

Нагрузочное тестирование (load testing) — это процесс проверки способности системы обрабатывать ожидаемую нагрузку. Это критически важно для production систем, чтобы предотвратить падение при пиковых нагрузках.

Цели нагрузочного тестирования

  • Определить максимальную пропускную способность (throughput)
  • Найти узкие места (bottlenecks)
  • Оценить время отклика при различных нагрузках
  • Убедиться в стабильности системы
  • Выявить утечки памяти и ресурсов

Инструменты для нагрузочного тестирования

1. Apache JMeter — популярный инструмент для Java приложений, но поддерживает HTTP:

# Простой тест 100 пользователей, 1000 запросов
jmeter -n -t test_plan.jmx -l results.jtl

2. Locust — на Python, идеален для наших целей:

pip install locust

3. Apache Bench (ab) — простой инструмент для HTTP:

# 1000 запросов, 100 одновременных подключений
ab -n 1000 -c 100 http://example.com/

4. wrk — современный инструмент на Lua, очень быстрый

Практический пример с Locust

Создание тестового скрипта:

from locust import HttpUser, task, between
import random

class APIUser(HttpUser):
    wait_time = between(1, 5)  # Ждать 1-5 сек между запросами
    
    @task(3)  # Этот task выполняется в 3 раза чаще
    def get_users(self):
        # GET /api/v1/users
        self.client.get("/api/v1/users")
    
    @task(1)  # Этот task выполняется реже
    def create_user(self):
        # POST /api/v1/users
        payload = {
            "name": f"User_{random.randint(1000, 9999)}",
            "email": f"user_{random.randint(1000, 9999)}@example.com"
        }
        self.client.post("/api/v1/users", json=payload)
    
    @task(2)
    def get_user_detail(self):
        # GET /api/v1/users/{id}
        user_id = random.randint(1, 100)
        self.client.get(f"/api/v1/users/{user_id}")
    
    def on_start(self):
        """Выполняется при старте пользователя"""
        # Можно выполнить auth или setup
        pass

Запуск нагрузочного теста:

# Командная строка
locust -f locustfile.py -u 100 -r 10 --run-time 1m -H http://localhost:8000

# -u 100 — 100 виртуальных пользователей
# -r 10 — добавлять 10 пользователей в секунду
# --run-time 1m — запустить на 1 минуту
# -H http://localhost:8000 — URL целевого сервера

# Web UI
locust -f locustfile.py -H http://localhost:8000
# Откроет http://localhost:8089

Запуск с прогрессирующей нагрузкой

from locust import HttpUser, task, between
from locust.contrib.fasthttp import FastHttpUser

class ProgressiveLoadTest(FastHttpUser):
    wait_time = between(0, 1)
    
    def on_start(self):
        self.request_count = 0
    
    @task
    def make_request(self):
        response = self.client.get("/api/v1/data")
        if response.status_code == 200:
            self.request_count += 1
        else:
            # Логирование ошибок
            print(f"Error: {response.status_code}")

Нагрузочное тестирование с pytest-benchmark

Для тестирования отдельных функций:

import pytest
from myapp.users.service import UserService

def test_get_user_performance(benchmark):
    user_service = UserService()
    
    # benchmark будет выполнить функцию много раз
    result = benchmark(user_service.get_user, user_id=1)
    
    assert result is not None

# Запуск
pytest test_performance.py --benchmark-only

Тестирование с помощью asyncio

Для асинхронных приложений:

import asyncio
import aiohttp
import time

async def test_api():
    async with aiohttp.ClientSession() as session:
        tasks = []
        start = time.time()
        
        # 1000 одновременных запросов
        for i in range(1000):
            task = session.get('http://localhost:8000/api/v1/data')
            tasks.append(task)
        
        responses = await asyncio.gather(*tasks)
        elapsed = time.time() - start
        
        success = sum(1 for r in responses if r.status == 200)
        print(f"Success: {success}/1000")
        print(f"Time: {elapsed:.2f}s")
        print(f"RPS: {1000/elapsed:.0f}")

asyncio.run(test_api())

Анализ результатов

import csv
from statistics import mean, stdev, median

def analyze_results(results_file):
    response_times = []
    status_codes = {}
    
    with open(results_file, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            response_times.append(float(row['response_time']))
            status = row['status']
            status_codes[status] = status_codes.get(status, 0) + 1
    
    print(f"Total requests: {len(response_times)}")
    print(f"Average response time: {mean(response_times):.2f}ms")
    print(f"Median response time: {median(response_times):.2f}ms")
    print(f"Min response time: {min(response_times):.2f}ms")
    print(f"Max response time: {max(response_times):.2f}ms")
    print(f"Std deviation: {stdev(response_times):.2f}ms")
    
    print("\nStatus codes:")
    for status, count in sorted(status_codes.items()):
        print(f"  {status}: {count}")

Использование Prometheus для мониторинга

from prometheus_client import Counter, Histogram, start_http_server
import time

# Метрики
request_count = Counter(
    'requests_total',
    'Total requests',
    ['method', 'endpoint', 'status']
)

response_time = Histogram(
    'request_duration_seconds',
    'Request duration',
    ['method', 'endpoint']
)

def middleware(request):
    start = time.time()
    try:
        # Обработка запроса
        response = process(request)
        status = response.status_code
    except Exception as e:
        status = 500
    finally:
        duration = time.time() - start
        
        # Сбор метрик
        request_count.labels(
            method=request.method,
            endpoint=request.path,
            status=status
        ).inc()
        
        response_time.labels(
            method=request.method,
            endpoint=request.path
        ).observe(duration)
    
    return response

Типовой сценарий нагрузочного тестирования

# 1. Ramp-up: увеличение нагрузки
Нагрузка: 0 -> 10 -> 50 -> 100 пользователей
Время: 0s -> 1m -> 2m -> 3m

# 2. Peak: пиковая нагрузка
Нагрузка: 100 пользователей
Время: 3m -> 15m

# 3. Spike: скачок нагрузки (тест стабильности)
Нагрузка: 100 -> 500 пользователей за 30 секунд

# 4. Ramp-down: снижение нагрузки
Нагрузка: 500 -> 0 пользователей
Время: 5m

Критерии успеха

class LoadTestCriteria:
    # Время отклика P95 < 500ms
    P95_RESPONSE_TIME = 500  # ms
    
    # Успешность >= 99.5%
    SUCCESS_RATE = 0.995
    
    # Не более 0.1% ошибок 5xx
    ERROR_RATE_5XX = 0.001
    
    # Нет утечек памяти
    MEMORY_GROWTH = 0.05  # 5% за тест

def check_criteria(metrics):
    assert metrics.p95_response_time < LoadTestCriteria.P95_RESPONSE_TIME
    assert metrics.success_rate > LoadTestCriteria.SUCCESS_RATE
    assert metrics.error_rate_5xx < LoadTestCriteria.ERROR_RATE_5XX

Типичные ошибки

# ❌ Неправильно: блокирующие операции
for i in range(1000):
    response = requests.get('http://example.com')  # Медленно!

# ✅ Правильно: асинхронные операции
async with aiohttp.ClientSession() as session:
    tasks = [session.get('http://example.com') for i in range(1000)]
    await asyncio.gather(*tasks)

# ❌ Неправильно: тестирование на localhost
# Network latency не учитывается
ab -n 1000 http://localhost:8000/

# ✅ Правильно: тестирование на реальном хосте
ab -n 1000 http://production.example.com/

Заключение

Нагрузочное тестирование на практике:

  • Цели: найти пропускную способность, узкие места, стабильность
  • Инструменты: Locust, JMeter, ab, wrk
  • Сценарии: ramp-up, peak, spike, ramp-down
  • Метрики: response time (P50/P95/P99), throughput, error rate
  • Мониторинг: Prometheus, графики, логирование
  • Критерии: определить минимально приемлемые показатели
  • Итерация: тестировать -> улучшать -> тестировать
Как реализуется нагрузочное тестирование на практике? | PrepBro