← Назад к вопросам

В каком случае лучше использовать AIOHTTP

2.0 Middle🔥 121 комментариев
#FastAPI и Flask#Асинхронность и многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Когда использовать AIOHTTP

Введение

AIOHTTP — это асинхронная библиотека для HTTP запросов в Python, основанная на asyncio. Она предназначена для работы с высоконагруженными асинхронными приложениями.

AIOHTTP идеален для

1. Множественные параллельные HTTP запросы

Это главный случай использования AIOHTTP. Когда нужно делать много HTTP запросов одновременно:

import aiohttp
import asyncio

async def fetch_many_urls():
    urls = [
        "https://api.example.com/user/1",
        "https://api.example.com/user/2",
        "https://api.example.com/user/3",
        # ... 100 URL'ов
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    return results

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

# Все 100 запросов выполняются параллельно в одном потоке
asyncio.run(fetch_many_urls())

2. WebSocket соединения

AIOHTTP отлично подходит для работы с WebSocket'ами:

import aiohttp
import asyncio

async def websocket_client():
    async with aiohttp.ClientSession() as session:
        # Клиент
        async with session.ws_connect('wss://echo.websocket.org') as ws:
            await ws.send_str("Hello!")
            msg = await ws.receive()
            print(f"Получено: {msg.data}")

# WebSocket сервер
from aiohttp import web

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            await ws.send_str(f"Echo: {msg.data}")
    
    return ws

app = web.Application()
app.router.add_get('/ws', websocket_handler)

3. Server-Sent Events (SSE)

Для потоковой доставки данных:

async def fetch_sse():
    """Получать события из SSE потока"""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/events') as resp:
            async for line in resp.content:
                event_data = line.decode('utf-8')
                print(f"Событие: {event_data}")

4. Высоконагруженные микросервисы

Когда backend сервис должен делать много HTTP вызовов к другим сервисам:

from aiohttp import web
import aiohttp

class UserService:
    def __init__(self, session: aiohttp.ClientSession):
        self.session = session
    
    async def get_user_data(self, user_id: int):
        """Получить данные пользователя из нескольких сервисов"""
        # Параллельные запросы к 3 микросервисам
        profile, posts, recommendations = await asyncio.gather(
            self._fetch_profile(user_id),
            self._fetch_posts(user_id),
            self._fetch_recommendations(user_id),
        )
        return {
            "profile": profile,
            "posts": posts,
            "recommendations": recommendations,
        }
    
    async def _fetch_profile(self, user_id: int):
        async with self.session.get(f"http://profile-service/users/{user_id}") as resp:
            return await resp.json()
    
    async def _fetch_posts(self, user_id: int):
        async with self.session.get(f"http://post-service/users/{user_id}/posts") as resp:
            return await resp.json()
    
    async def _fetch_recommendations(self, user_id: int):
        async with self.session.get(f"http://recommendation-service/users/{user_id}/recs") as resp:
            return await resp.json()

# Использование
async def handle_request(request):
    user_id = int(request.match_info['user_id'])
    service = UserService(request.app['http_session'])
    data = await service.get_user_data(user_id)
    return web.json_response(data)

AIOHTTP vs другие библиотеки

AIOHTTP vs Requests

# Requests (блокирующий)
import requests

def fetch_urls_sync():
    """Медленно: каждый запрос блокирует следующий"""
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    results = []
    
    for url in urls:
        response = requests.get(url)  # Ждём каждый запрос!
        results.append(response.json())
    
    return results
    # Время: ~100 * 100ms = 10 сек

# AIOHTTP (асинхронный)
import aiohttp
import asyncio

async def fetch_urls_async():
    """Быстро: все запросы выполняются параллельно"""
    urls = [f"https://api.example.com/item/{i}" for i in range(100)]
    
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        results = [await r.json() for r in responses]
    
    return results
    # Время: ~100ms (самый долгий запрос)

AIOHTTP vs httpx

# AIOHTTP (низкоуровневый, быстрый)
import aiohttp

async def with_aiohttp():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()
    # Больше контроля, выше производительность

# HTTPX (высокоуровневый, удобнее)
import httpx

async def with_httpx():
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
        return resp.json()
    # Удобнее API, но медленнее

AIOHTTP vs asyncio.create_connection

# Прямой socket (низкоуровневый, сложный)
import asyncio

async def raw_socket():
    reader, writer = await asyncio.open_connection('example.com', 443)
    # Нужно самому формировать HTTP запрос
    # Нужно самому обрабатывать SSL/TLS
    # Нужно самому парсить ответ

# AIOHTTP (высокоуровневый, удобный)
import aiohttp

async def with_aiohttp():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://example.com') as resp:
            return await resp.json()
    # AIOHTTP всё делает за вас

Практические примеры

Пример 1: Web scraper

import aiohttp
import asyncio
from bs4 import BeautifulSoup

class Scraper:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_page(self, session, url):
        """Fetch одной страницы с ограничением параллелизма"""
        async with self.semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    return await resp.text()
            except Exception as e:
                print(f"Ошибка при получении {url}: {e}")
                return None
    
    async def scrape_pages(self, urls):
        """Scrape много страниц параллельно"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            pages = await asyncio.gather(*tasks)
        
        results = []
        for page in pages:
            if page:
                soup = BeautifulSoup(page, 'html.parser')
                title = soup.find('h1').text if soup.find('h1') else 'N/A'
                results.append(title)
        
        return results

# Использование
scraper = Scraper(max_concurrent=5)
urls = [f"https://example.com/page/{i}" for i in range(1, 101)]
results = asyncio.run(scraper.scrape_pages(urls))
print(f"Scraped {len(results)} pages")

Пример 2: API Gateway

from aiohttp import web
import aiohttp

class APIGateway:
    def __init__(self):
        self.session = None
    
    async def start(self):
        """Создать сессию при старте"""
        self.session = aiohttp.ClientSession()
    
    async def stop(self):
        """Закрыть сессию при остановке"""
        await self.session.close()
    
    async def aggregate_user_data(self, request):
        """Агрегировать данные из нескольких микросервисов"""
        user_id = request.match_info['user_id']
        
        # Параллельные запросы
        results = await asyncio.gather(
            self._call_service('auth', f'/users/{user_id}'),
            self._call_service('profile', f'/profiles/{user_id}'),
            self._call_service('inventory', f'/users/{user_id}/items'),
        )
        
        return web.json_response({
            'auth': results[0],
            'profile': results[1],
            'inventory': results[2],
        })
    
    async def _call_service(self, service, endpoint):
        """Вызвать микросервис"""
        url = f"http://{service}-service:8000{endpoint}"
        try:
            async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
                if resp.status == 200:
                    return await resp.json()
                else:
                    return {"error": f"Service returned {resp.status}"}
        except asyncio.TimeoutError:
            return {"error": "Service timeout"}
        except Exception as e:
            return {"error": str(e)}

# Создание приложения
gateway = APIGateway()
app = web.Application()
app.router.add_get('/users/{user_id}', gateway.aggregate_user_data)
app.on_startup.append(lambda app: gateway.start())
app.on_cleanup.append(lambda app: gateway.stop())

if __name__ == '__main__':
    web.run_app(app, port=8000)

Когда НЕ использовать AIOHTTP

1. Одиночные HTTP запросы

# Оверкомпликация:
import aiohttp
import asyncio

async def single_request():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

asyncio.run(single_request())

# Лучше просто:
import requests
response = requests.get(url)
data = response.json()

2. Синхронный код

# AIOHTTP требует asyncio
# Если весь код синхронный, лучше использовать Requests

import requests

def get_data():
    response = requests.get(url)
    return response.json()

3. CPU-bound операции

# AIOHTTP для I/O, но не для CPU
# Для CPU используй multiprocessing

import multiprocessing

def cpu_heavy():
    result = sum(i**2 for i in range(10000000))
    return result

with multiprocessing.Pool() as pool:
    results = pool.map(cpu_heavy, range(10))

Лучшие практики

1. Переиспользуй ClientSession

# ❌ Плохо: создаёшь сессию для каждого запроса
async def bad():
    for url in urls:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                print(await resp.json())

# ✅ Хорошо: одна сессия для многих запросов
async def good():
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as resp:
                print(await resp.json())

2. Используй Semaphore для ограничения параллелизма

async def limited_concurrency(urls):
    semaphore = asyncio.Semaphore(10)  # Макс 10 одновременных запросов
    
    async def bounded_get(session, url):
        async with semaphore:
            async with session.get(url) as resp:
                return await resp.json()
    
    async with aiohttp.ClientSession() as session:
        tasks = [bounded_get(session, url) for url in urls]
        return await asyncio.gather(*tasks)

3. Правильная обработка timeout'ов

async def with_timeout():
    timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=10)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
            return await resp.json()

Выводы

✅ Используй AIOHTTP для параллельных HTTP запросов ✅ Идеален для микросервисной архитектуры ✅ Отлично подходит для WebSocket соединений ✅ Для одиночных запросов используй Requests ✅ Помни про ограничение параллелизма (Semaphore) ✅ Переиспользуй ClientSession ✅ Правильно настраивай timeout'ы