← Назад к вопросам
Как улучшить производительность синхронных запросов?
2.3 Middle🔥 191 комментариев
#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Оптимизация производительности синхронных запросов
Производительность синхронных запросов — критичная задача при работе с IO-операциями. Вот основные стратегии оптимизации:
1. Кэширование результатов
Первый и самый простой способ — кэшировать результаты часто запрашиваемых данных:
from functools import lru_cache
import requests
@lru_cache(maxsize=128)
def get_user_data(user_id: int) -> dict:
response = requests.get(f"https://api.example.com/users/{user_id}")
return response.json()
Для более гибкого кэширования используй Redis или Memcached:
import redis
import json
redis_client = redis.Redis(host="localhost", port=6379)
def get_user_cached(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# Проверяем кэш
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Если нет — делаем запрос
data = requests.get(f"https://api.example.com/users/{user_id}").json()
# Кэшируем на 1 час
redis_client.setex(cache_key, 3600, json.dumps(data))
return data
2. Connection Pooling
Создание нового соединения для каждого запроса — дорого. Используй пулы соединений:
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import requests
def create_session():
session = requests.Session()
# Настройка retry стратегии
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
# Настройка pooling
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
session = create_session()
response = session.get("https://api.example.com/data")
3. Batch запросы вместо множественных
Если нужны данные для нескольких объектов, групируй их в один запрос:
# ❌ Плохо — N запросов
users = []
for user_id in [1, 2, 3, 4, 5]:
response = requests.get(f"https://api.example.com/users/{user_id}")
users.append(response.json())
# ✅ Хорошо — 1 запрос
response = requests.post(
"https://api.example.com/users/batch",
json={"ids": [1, 2, 3, 4, 5]}
)
users = response.json()
4. Асинхронные запросы (asyncio + aiohttp)
Для синхронного кода, который нужно оптимизировать, переходи на асинхронность:
import asyncio
import aiohttp
async def fetch_user(session, user_id: int):
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
return await resp.json()
async def get_multiple_users(user_ids: list):
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, uid) for uid in user_ids]
return await asyncio.gather(*tasks)
# Использование
users = asyncio.run(get_multiple_users([1, 2, 3, 4, 5]))
5. Оптимизация сетевых параметров
import requests
# Увеличь timeout для хороших сетей
response = requests.get(
"https://api.example.com/data",
timeout=30 # 30 секунд
)
# Используй streaming для больших файлов
response = requests.get(
"https://api.example.com/large-file",
stream=True
)
for chunk in response.iter_content(chunk_size=8192):
process_chunk(chunk)
6. Профилирование и мониторинг
Определи узкие места перед оптимизацией:
import time
from functools import wraps
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.3f}s")
return result
return wrapper
@timer
def fetch_data():
return requests.get("https://api.example.com/data").json()
Итоговая рекомендация
Для максимального прироста производительности:
- Используй асинхронность (asyncio + aiohttp) — самый большой выигрыш
- Кэшируй часто запрашиваемые данные
- Батчируй запросы на серверной стороне
- Настраивай connection pooling для переиспользования соединений
- Профилируй перед оптимизацией
Отказ от синхронных запросов в пользу асинхронных в 5-10 раз ускоряет обработку множественных IO-операций.