← Назад к вопросам

Как решить проблему при отправке тысячи асинхронный HTTP запросов в Python?

3.0 Senior🔥 131 комментариев
#REST API и HTTP#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как решить проблему при отправке тысячи асинхронных HTTP запросов в Python

Отправка тысяч асинхронных запросов требует аккуратности при управлении ресурсами, обработке ошибок, и оптимизации производительности. Рассмотрим лучшие практики и основные проблемы.

1. Основная проблема: слишком много одновременных соединений

# ❌ ПЛОХО: Это упадёт
import asyncio
import aiohttp

async def bad_approach():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in range(1000):
            # Создаём 1000 задач одновременно
            task = session.get(f'https://api.example.com/data/{url}')
            tasks.append(task)
        
        # Это попытается открыть 1000 соединений одновременно
        results = await asyncio.gather(*tasks)

asyncio.run(bad_approach())

Проблемы:

  • Слишком много open file descriptors (лимит ОС)
  • Исчерпание памяти
  • Server отклоняет запросы
  • Network congestion

2. Решение: Semaphore для ограничения одновременных запросов

import asyncio
import aiohttp

async def fetch_with_semaphore(session, url, semaphore):
    """Fetch с ограничением одновременных запросов"""
    async with semaphore:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
                return await response.json()
        except asyncio.TimeoutError:
            print(f"Timeout: {url}")
            return None
        except aiohttp.ClientError as e:
            print(f"Client error: {e}")
            return None

async def fetch_many_urls(urls, max_concurrent=100):
    """Fetch множество URL с ограничением параллелизма"""
    # Максимум 100 одновременных соединений
    semaphore = asyncio.Semaphore(max_concurrent)
    
    # Настройка connector для переиспользования соединений
    connector = aiohttp.TCPConnector(
        limit=max_concurrent,  # Общий лимит соединений
        limit_per_host=5,      # Лимит per хост
        ttl_dns_cache=300      # TTL кеша DNS
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            fetch_with_semaphore(session, url, semaphore)
            for url in urls
        ]
        
        # Используем as_completed для обработки результатов по мере их появления
        results = []
        for task in asyncio.as_completed(tasks):
            result = await task
            results.append(result)
        
        return results

# Использование
urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
results = asyncio.run(fetch_many_urls(urls, max_concurrent=100))

3. Batching: обработка в батчах

import asyncio
import aiohttp
from typing import List

async def fetch_batch(session, urls, semaphore):
    """Fetch batch URLs"""
    async with semaphore:
        tasks = [
            session.get(url, timeout=aiohttp.ClientTimeout(total=30))
            for url in urls
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def fetch_with_batching(all_urls, batch_size=50, max_concurrent=3):
    """Разбиваем на батчи и обрабатываем последовательно"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    connector = aiohttp.TCPConnector(
        limit=batch_size * 2,
        limit_per_host=5
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        results = []
        
        # Разбиваем на батчи
        for i in range(0, len(all_urls), batch_size):
            batch = all_urls[i:i + batch_size]
            print(f"Processing batch {i//batch_size + 1}...")
            
            try:
                batch_results = await fetch_batch(session, batch, semaphore)
                results.extend(batch_results)
            except Exception as e:
                print(f"Batch error: {e}")
                continue
            
            # Pause между батчами для stabilization
            await asyncio.sleep(0.1)
        
        return results

# Использование
urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
results = asyncio.run(fetch_with_batching(urls, batch_size=50, max_concurrent=3))

4. Retry механизм с exponential backoff

import asyncio
import aiohttp
import random

async def fetch_with_retry(
    session, url, 
    max_retries=3, 
    initial_delay=1, 
    backoff_factor=2
):
    """Fetch с автоматическим retry"""
    delay = initial_delay
    
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
                if response.status == 200:
                    return await response.json()
                elif response.status in [429, 503]:  # Rate limit, Service unavailable
                    # Exponential backoff с jitter
                    jitter = random.uniform(0, 1)
                    wait_time = delay * (backoff_factor ** attempt) + jitter
                    print(f"Retry {url} in {wait_time:.2f}s (attempt {attempt + 1})")
                    await asyncio.sleep(wait_time)
                else:
                    print(f"Error {response.status}: {url}")
                    return None
        except asyncio.TimeoutError:
            if attempt < max_retries - 1:
                jitter = random.uniform(0, 1)
                wait_time = delay * (backoff_factor ** attempt) + jitter
                await asyncio.sleep(wait_time)
            else:
                print(f"Timeout after {max_retries} attempts: {url}")
                return None
        except aiohttp.ClientError as e:
            print(f"Connection error: {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (backoff_factor ** attempt))
            else:
                return None
    
    return None

async def fetch_many_with_retry(urls, max_concurrent=100):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    connector = aiohttp.TCPConnector(limit=max_concurrent, limit_per_host=5)
    
    async def fetch_with_sem(session, url):
        async with semaphore:
            return await fetch_with_retry(session, url)
    
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch_with_sem(session, url) for url in urls]
        return await asyncio.gather(*tasks)

5. Использование asyncio.Queue для управления нагрузкой

import asyncio
import aiohttp
from collections import defaultdict

class URLWorkerPool:
    def __init__(self, worker_count=10, connector_limit=100):
        self.queue = asyncio.Queue()
        self.worker_count = worker_count
        self.connector_limit = connector_limit
        self.results = []
        self.errors = []
    
    async def worker(self, session):
        """Worker который обрабатывает URL из очереди"""
        while True:
            try:
                url = self.queue.get_nowait()
            except asyncio.QueueEmpty:
                break
            
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
                    data = await response.json()
                    self.results.append({"url": url, "data": data})
            except Exception as e:
                self.errors.append({"url": url, "error": str(e)})
            
            self.queue.task_done()
    
    async def process_urls(self, urls):
        """Обработать список URL"""
        # Добавляем URL в очередь
        for url in urls:
            await self.queue.put(url)
        
        # Настройка соединения
        connector = aiohttp.TCPConnector(
            limit=self.connector_limit,
            limit_per_host=5
        )
        
        async with aiohttp.ClientSession(connector=connector) as session:
            # Создаём worker задачи
            workers = [
                asyncio.create_task(self.worker(session))
                for _ in range(self.worker_count)
            ]
            
            # Ждём пока все URL обработаны
            await self.queue.join()
            
            # Отменяем worker задачи
            for worker in workers:
                worker.cancel()
        
        return self.results, self.errors

# Использование
async def main():
    urls = [f'https://api.example.com/data/{i}' for i in range(1000)]
    pool = URLWorkerPool(worker_count=20, connector_limit=100)
    results, errors = await pool.process_urls(urls)
    
    print(f"Successful: {len(results)}")
    print(f"Errors: {len(errors)}")
    
    if errors:
        for error in errors[:5]:  # Первые 5 ошибок
            print(f"  {error['url']}: {error['error']}")

asyncio.run(main())

6. Optimizations

# Keep-alive и connection pooling
connector = aiohttp.TCPConnector(
    limit=100,                    # Total connections
    limit_per_host=5,             # Per-host limit
    ttl_dns_cache=300,            # DNS cache TTL
    ssl=False,                    # Отключить SSL verification (если необходимо)
    force_close=False,            # Переиспользовать соединения
)

session = aiohttp.ClientSession(
    connector=connector,
    headers={'User-Agent': 'MyBot/1.0'},  # Custom headers
    json_serialize=json.dumps      # Custom JSON encoder
)

# Timeout settings
timeout = aiohttp.ClientTimeout(
    total=30,        # Всего time на весь запрос
    connect=10,      # На подключение
    sock_read=10,    # На чтение данных
    sock_connect=10  # На socket connect
)

7. Мониторинг и logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

async def fetch_with_logging(session, url):
    logger.info(f"Starting request: {url}")
    try:
        async with session.get(url) as response:
            logger.info(f"Success {response.status}: {url}")
            return await response.json()
    except Exception as e:
        logger.error(f"Failed {url}: {e}")
        return None

Чеклист для отправки тысяч запросов

[ ] Используется Semaphore для ограничения параллелизма (100-200 соединений)
[ ] TCPConnector настроен с limit и limit_per_host
[ ] Implement retry механизм с exponential backoff
[ ] Timeout установлены (30 сек общий, 10 сек per операция)
[ ] Обработка rate limiting (429 статус)
[ ] Error handling и logging
[ ] Memory management (не храним все результаты в памяти)
[ ] Graceful shutdown при прерывании
[ ] DNS caching включён
[ ] Connection pooling переиспользует соединения

Заключение

Для отправки тысяч асинхронных запросов ключ — это управление ресурсами: ограничить одновременные соединения через Semaphore, настроить TCPConnector правильно, реализовать retry с exponential backoff, и обработать ошибки gracefully. Используй worker pool или queue pattern для масштабируемого решения.