В каком случае лучше использовать AIOHTTP
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Когда использовать AIOHTTP
Введение
AIOHTTP — это асинхронная библиотека для HTTP запросов в Python, основанная на asyncio. Она предназначена для работы с высоконагруженными асинхронными приложениями.
AIOHTTP идеален для
1. Множественные параллельные HTTP запросы
Это главный случай использования AIOHTTP. Когда нужно делать много HTTP запросов одновременно:
import aiohttp
import asyncio
async def fetch_many_urls():
urls = [
"https://api.example.com/user/1",
"https://api.example.com/user/2",
"https://api.example.com/user/3",
# ... 100 URL'ов
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
# Все 100 запросов выполняются параллельно в одном потоке
asyncio.run(fetch_many_urls())
2. WebSocket соединения
AIOHTTP отлично подходит для работы с WebSocket'ами:
import aiohttp
import asyncio
async def websocket_client():
async with aiohttp.ClientSession() as session:
# Клиент
async with session.ws_connect('wss://echo.websocket.org') as ws:
await ws.send_str("Hello!")
msg = await ws.receive()
print(f"Получено: {msg.data}")
# WebSocket сервер
from aiohttp import web
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)
3. Server-Sent Events (SSE)
Для потоковой доставки данных:
async def fetch_sse():
"""Получать события из SSE потока"""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/events') as resp:
async for line in resp.content:
event_data = line.decode('utf-8')
print(f"Событие: {event_data}")
4. Высоконагруженные микросервисы
Когда backend сервис должен делать много HTTP вызовов к другим сервисам:
from aiohttp import web
import aiohttp
class UserService:
def __init__(self, session: aiohttp.ClientSession):
self.session = session
async def get_user_data(self, user_id: int):
"""Получить данные пользователя из нескольких сервисов"""
# Параллельные запросы к 3 микросервисам
profile, posts, recommendations = await asyncio.gather(
self._fetch_profile(user_id),
self._fetch_posts(user_id),
self._fetch_recommendations(user_id),
)
return {
"profile": profile,
"posts": posts,
"recommendations": recommendations,
}
async def _fetch_profile(self, user_id: int):
async with self.session.get(f"http://profile-service/users/{user_id}") as resp:
return await resp.json()
async def _fetch_posts(self, user_id: int):
async with self.session.get(f"http://post-service/users/{user_id}/posts") as resp:
return await resp.json()
async def _fetch_recommendations(self, user_id: int):
async with self.session.get(f"http://recommendation-service/users/{user_id}/recs") as resp:
return await resp.json()
# Использование
async def handle_request(request):
user_id = int(request.match_info['user_id'])
service = UserService(request.app['http_session'])
data = await service.get_user_data(user_id)
return web.json_response(data)
AIOHTTP vs другие библиотеки
AIOHTTP vs Requests
# Requests (блокирующий)
import requests
def fetch_urls_sync():
"""Медленно: каждый запрос блокирует следующий"""
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
results = []
for url in urls:
response = requests.get(url) # Ждём каждый запрос!
results.append(response.json())
return results
# Время: ~100 * 100ms = 10 сек
# AIOHTTP (асинхронный)
import aiohttp
import asyncio
async def fetch_urls_async():
"""Быстро: все запросы выполняются параллельно"""
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = [await r.json() for r in responses]
return results
# Время: ~100ms (самый долгий запрос)
AIOHTTP vs httpx
# AIOHTTP (низкоуровневый, быстрый)
import aiohttp
async def with_aiohttp():
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
# Больше контроля, выше производительность
# HTTPX (высокоуровневый, удобнее)
import httpx
async def with_httpx():
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()
# Удобнее API, но медленнее
AIOHTTP vs asyncio.create_connection
# Прямой socket (низкоуровневый, сложный)
import asyncio
async def raw_socket():
reader, writer = await asyncio.open_connection('example.com', 443)
# Нужно самому формировать HTTP запрос
# Нужно самому обрабатывать SSL/TLS
# Нужно самому парсить ответ
# AIOHTTP (высокоуровневый, удобный)
import aiohttp
async def with_aiohttp():
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com') as resp:
return await resp.json()
# AIOHTTP всё делает за вас
Практические примеры
Пример 1: Web scraper
import aiohttp
import asyncio
from bs4 import BeautifulSoup
class Scraper:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url):
"""Fetch одной страницы с ограничением параллелизма"""
async with self.semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.text()
except Exception as e:
print(f"Ошибка при получении {url}: {e}")
return None
async def scrape_pages(self, urls):
"""Scrape много страниц параллельно"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
results = []
for page in pages:
if page:
soup = BeautifulSoup(page, 'html.parser')
title = soup.find('h1').text if soup.find('h1') else 'N/A'
results.append(title)
return results
# Использование
scraper = Scraper(max_concurrent=5)
urls = [f"https://example.com/page/{i}" for i in range(1, 101)]
results = asyncio.run(scraper.scrape_pages(urls))
print(f"Scraped {len(results)} pages")
Пример 2: API Gateway
from aiohttp import web
import aiohttp
class APIGateway:
def __init__(self):
self.session = None
async def start(self):
"""Создать сессию при старте"""
self.session = aiohttp.ClientSession()
async def stop(self):
"""Закрыть сессию при остановке"""
await self.session.close()
async def aggregate_user_data(self, request):
"""Агрегировать данные из нескольких микросервисов"""
user_id = request.match_info['user_id']
# Параллельные запросы
results = await asyncio.gather(
self._call_service('auth', f'/users/{user_id}'),
self._call_service('profile', f'/profiles/{user_id}'),
self._call_service('inventory', f'/users/{user_id}/items'),
)
return web.json_response({
'auth': results[0],
'profile': results[1],
'inventory': results[2],
})
async def _call_service(self, service, endpoint):
"""Вызвать микросервис"""
url = f"http://{service}-service:8000{endpoint}"
try:
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
if resp.status == 200:
return await resp.json()
else:
return {"error": f"Service returned {resp.status}"}
except asyncio.TimeoutError:
return {"error": "Service timeout"}
except Exception as e:
return {"error": str(e)}
# Создание приложения
gateway = APIGateway()
app = web.Application()
app.router.add_get('/users/{user_id}', gateway.aggregate_user_data)
app.on_startup.append(lambda app: gateway.start())
app.on_cleanup.append(lambda app: gateway.stop())
if __name__ == '__main__':
web.run_app(app, port=8000)
Когда НЕ использовать AIOHTTP
1. Одиночные HTTP запросы
# Оверкомпликация:
import aiohttp
import asyncio
async def single_request():
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
asyncio.run(single_request())
# Лучше просто:
import requests
response = requests.get(url)
data = response.json()
2. Синхронный код
# AIOHTTP требует asyncio
# Если весь код синхронный, лучше использовать Requests
import requests
def get_data():
response = requests.get(url)
return response.json()
3. CPU-bound операции
# AIOHTTP для I/O, но не для CPU
# Для CPU используй multiprocessing
import multiprocessing
def cpu_heavy():
result = sum(i**2 for i in range(10000000))
return result
with multiprocessing.Pool() as pool:
results = pool.map(cpu_heavy, range(10))
Лучшие практики
1. Переиспользуй ClientSession
# ❌ Плохо: создаёшь сессию для каждого запроса
async def bad():
for url in urls:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(await resp.json())
# ✅ Хорошо: одна сессия для многих запросов
async def good():
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as resp:
print(await resp.json())
2. Используй Semaphore для ограничения параллелизма
async def limited_concurrency(urls):
semaphore = asyncio.Semaphore(10) # Макс 10 одновременных запросов
async def bounded_get(session, url):
async with semaphore:
async with session.get(url) as resp:
return await resp.json()
async with aiohttp.ClientSession() as session:
tasks = [bounded_get(session, url) for url in urls]
return await asyncio.gather(*tasks)
3. Правильная обработка timeout'ов
async def with_timeout():
timeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
return await resp.json()
Выводы
✅ Используй AIOHTTP для параллельных HTTP запросов ✅ Идеален для микросервисной архитектуры ✅ Отлично подходит для WebSocket соединений ✅ Для одиночных запросов используй Requests ✅ Помни про ограничение параллелизма (Semaphore) ✅ Переиспользуй ClientSession ✅ Правильно настраивай timeout'ы