← Назад к вопросам
Как решить проблему при отправке тысячи асинхронный HTTP запросов в Python?
3.0 Senior🔥 131 комментариев
#REST API и HTTP#Асинхронность и многопоточность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как решить проблему при отправке тысячи асинхронных HTTP запросов в Python
Отправка тысяч асинхронных запросов требует аккуратности при управлении ресурсами, обработке ошибок, и оптимизации производительности. Рассмотрим лучшие практики и основные проблемы.
1. Основная проблема: слишком много одновременных соединений
# ❌ ПЛОХО: Это упадёт
import asyncio
import aiohttp
async def bad_approach():
async with aiohttp.ClientSession() as session:
tasks = []
for url in range(1000):
# Создаём 1000 задач одновременно
task = session.get(f'https://api.example.com/data/{url}')
tasks.append(task)
# Это попытается открыть 1000 соединений одновременно
results = await asyncio.gather(*tasks)
asyncio.run(bad_approach())
Проблемы:
- Слишком много open file descriptors (лимит ОС)
- Исчерпание памяти
- Server отклоняет запросы
- Network congestion
2. Решение: Semaphore для ограничения одновременных запросов
import asyncio
import aiohttp
async def fetch_with_semaphore(session, url, semaphore):
"""Fetch с ограничением одновременных запросов"""
async with semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
return await response.json()
except asyncio.TimeoutError:
print(f"Timeout: {url}")
return None
except aiohttp.ClientError as e:
print(f"Client error: {e}")
return None
async def fetch_many_urls(urls, max_concurrent=100):
"""Fetch множество URL с ограничением параллелизма"""
# Максимум 100 одновременных соединений
semaphore = asyncio.Semaphore(max_concurrent)
# Настройка connector для переиспользования соединений
connector = aiohttp.TCPConnector(
limit=max_concurrent, # Общий лимит соединений
limit_per_host=5, # Лимит per хост
ttl_dns_cache=300 # TTL кеша DNS
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
fetch_with_semaphore(session, url, semaphore)
for url in urls
]
# Используем as_completed для обработки результатов по мере их появления
results = []
for task in asyncio.as_completed(tasks):
result = await task
results.append(result)
return results
# Использование
urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
results = asyncio.run(fetch_many_urls(urls, max_concurrent=100))
3. Batching: обработка в батчах
import asyncio
import aiohttp
from typing import List
async def fetch_batch(session, urls, semaphore):
"""Fetch batch URLs"""
async with semaphore:
tasks = [
session.get(url, timeout=aiohttp.ClientTimeout(total=30))
for url in urls
]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_with_batching(all_urls, batch_size=50, max_concurrent=3):
"""Разбиваем на батчи и обрабатываем последовательно"""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(
limit=batch_size * 2,
limit_per_host=5
)
async with aiohttp.ClientSession(connector=connector) as session:
results = []
# Разбиваем на батчи
for i in range(0, len(all_urls), batch_size):
batch = all_urls[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}...")
try:
batch_results = await fetch_batch(session, batch, semaphore)
results.extend(batch_results)
except Exception as e:
print(f"Batch error: {e}")
continue
# Pause между батчами для stabilization
await asyncio.sleep(0.1)
return results
# Использование
urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
results = asyncio.run(fetch_with_batching(urls, batch_size=50, max_concurrent=3))
4. Retry механизм с exponential backoff
import asyncio
import aiohttp
import random
async def fetch_with_retry(
session, url,
max_retries=3,
initial_delay=1,
backoff_factor=2
):
"""Fetch с автоматическим retry"""
delay = initial_delay
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
return await response.json()
elif response.status in [429, 503]: # Rate limit, Service unavailable
# Exponential backoff с jitter
jitter = random.uniform(0, 1)
wait_time = delay * (backoff_factor ** attempt) + jitter
print(f"Retry {url} in {wait_time:.2f}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
else:
print(f"Error {response.status}: {url}")
return None
except asyncio.TimeoutError:
if attempt < max_retries - 1:
jitter = random.uniform(0, 1)
wait_time = delay * (backoff_factor ** attempt) + jitter
await asyncio.sleep(wait_time)
else:
print(f"Timeout after {max_retries} attempts: {url}")
return None
except aiohttp.ClientError as e:
print(f"Connection error: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (backoff_factor ** attempt))
else:
return None
return None
async def fetch_many_with_retry(urls, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(limit=max_concurrent, limit_per_host=5)
async def fetch_with_sem(session, url):
async with semaphore:
return await fetch_with_retry(session, url)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_with_sem(session, url) for url in urls]
return await asyncio.gather(*tasks)
5. Использование asyncio.Queue для управления нагрузкой
import asyncio
import aiohttp
from collections import defaultdict
class URLWorkerPool:
def __init__(self, worker_count=10, connector_limit=100):
self.queue = asyncio.Queue()
self.worker_count = worker_count
self.connector_limit = connector_limit
self.results = []
self.errors = []
async def worker(self, session):
"""Worker который обрабатывает URL из очереди"""
while True:
try:
url = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
data = await response.json()
self.results.append({"url": url, "data": data})
except Exception as e:
self.errors.append({"url": url, "error": str(e)})
self.queue.task_done()
async def process_urls(self, urls):
"""Обработать список URL"""
# Добавляем URL в очередь
for url in urls:
await self.queue.put(url)
# Настройка соединения
connector = aiohttp.TCPConnector(
limit=self.connector_limit,
limit_per_host=5
)
async with aiohttp.ClientSession(connector=connector) as session:
# Создаём worker задачи
workers = [
asyncio.create_task(self.worker(session))
for _ in range(self.worker_count)
]
# Ждём пока все URL обработаны
await self.queue.join()
# Отменяем worker задачи
for worker in workers:
worker.cancel()
return self.results, self.errors
# Использование
async def main():
urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
pool = URLWorkerPool(worker_count=20, connector_limit=100)
results, errors = await pool.process_urls(urls)
print(f"Successful: {len(results)}")
print(f"Errors: {len(errors)}")
if errors:
for error in errors[:5]: # Первые 5 ошибок
print(f" {error['url']}: {error['error']}")
asyncio.run(main())
6. Optimizations
# Keep-alive и connection pooling
connector = aiohttp.TCPConnector(
limit=100, # Total connections
limit_per_host=5, # Per-host limit
ttl_dns_cache=300, # DNS cache TTL
ssl=False, # Отключить SSL verification (если необходимо)
force_close=False, # Переиспользовать соединения
)
session = aiohttp.ClientSession(
connector=connector,
headers={'User-Agent': 'MyBot/1.0'}, # Custom headers
json_serialize=json.dumps # Custom JSON encoder
)
# Timeout settings
timeout = aiohttp.ClientTimeout(
total=30, # Всего time на весь запрос
connect=10, # На подключение
sock_read=10, # На чтение данных
sock_connect=10 # На socket connect
)
7. Мониторинг и logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def fetch_with_logging(session, url):
logger.info(f"Starting request: {url}")
try:
async with session.get(url) as response:
logger.info(f"Success {response.status}: {url}")
return await response.json()
except Exception as e:
logger.error(f"Failed {url}: {e}")
return None
Чеклист для отправки тысяч запросов
[ ] Используется Semaphore для ограничения параллелизма (100-200 соединений)
[ ] TCPConnector настроен с limit и limit_per_host
[ ] Implement retry механизм с exponential backoff
[ ] Timeout установлены (30 сек общий, 10 сек per операция)
[ ] Обработка rate limiting (429 статус)
[ ] Error handling и logging
[ ] Memory management (не храним все результаты в памяти)
[ ] Graceful shutdown при прерывании
[ ] DNS caching включён
[ ] Connection pooling переиспользует соединения
Заключение
Для отправки тысяч асинхронных запросов ключ — это управление ресурсами: ограничить одновременные соединения через Semaphore, настроить TCPConnector правильно, реализовать retry с exponential backoff, и обработать ошибки gracefully. Используй worker pool или queue pattern для масштабируемого решения.