Как ускорить асинхронную работу перебора массива с 10000 ссылок?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как ускорить асинхронную работу перебора массива с 10000 ссылок?
Работа с большим количеством асинхронных операций (HTTP запросов) — классическая задача, где мелкие детали дают огромное ускорение. Покажу проверенные подходы.
Проблема: Наивный подход
Самая частая ошибка:
import asyncio
import aiohttp
async def fetch_url(url, session):
async with session.get(url, timeout=5) as response:
return await response.text()
async def main():
urls = [f"https://example.com/{i}" for i in range(10000)]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_url(url, session) for url in urls]
)
return results
Этот код создаст 10000 одновременных соединений, что вызовет ошибку файловых дескрипторов!
Решение 1: Использование семафора (Semaphore)
Ограничиваем количество одновременных запросов:
import asyncio
import aiohttp
async def fetch_url(url, session, semaphore):
async with semaphore:
async with session.get(url, timeout=5) as response:
return await response.text()
async def main():
urls = [f"https://example.com/{i}" for i in range(10000)]
semaphore = asyncio.Semaphore(100) # Максимум 100 одновременных
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_url(url, session, semaphore) for url in urls],
return_exceptions=True
)
return results
asyncio.run(main())
Рекомендуемое значение: 50-200 в зависимости от сервера и памяти.
Решение 2: asyncio.as_completed() для обработки по готовности
Обрабатываем результаты по мере их готовности:
async def main():
urls = [f"https://example.com/{i}" for i in range(10000)]
semaphore = asyncio.Semaphore(100)
results = []
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(url, session, semaphore) for url in urls]
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
print(f"Обработано: {len(results)}/10000")
return results
Плюс: видишь прогресс в реальном времени.
Решение 3: Батчинг (пакетная обработка)
Разбиваем на пакеты и обрабатываем последовательно:
async def process_batch(urls, session, batch_size=100):
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
batch_results = await asyncio.gather(
*[fetch_url(url, session) for url in batch],
return_exceptions=True
)
results.extend(batch_results)
print(f"Обработано батч {i//batch_size + 1}")
return results
async def main():
urls = [f"https://example.com/{i}" for i in range(10000)]
async with aiohttp.ClientSession() as session:
return await process_batch(urls, session, batch_size=200)
Решение 4: Комбинированный подход (РЕКОМЕНДУЕТСЯ)
Объединяем лучшие практики:
import asyncio
import aiohttp
from typing import List
async def fetch_url(url, session, semaphore):
async with semaphore:
try:
async with session.get(url, timeout=5) as response:
return {"url": url, "status": response.status, "data": await response.text()}
except asyncio.TimeoutError:
return {"url": url, "error": "timeout"}
except Exception as e:
return {"url": url, "error": str(e)}
async def main(urls: List[str]):
semaphore = asyncio.Semaphore(150)
connector = aiohttp.TCPConnector(limit=150, limit_per_host=30)
timeout = aiohttp.ClientTimeout(total=60, connect=10)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
tasks = [fetch_url(url, session, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
urls = [f"https://example.com/{i}" for i in range(10000)]
results = asyncio.run(main(urls))
Оптимизация параметров
- limit (коннектор): 150-300 (общий предел соединений)
- limit_per_host: 20-50 (на один хост)
- semaphore: 100-200 (асинхронных задач)
- timeout: 5-10 сек на запрос, total=60 сек
- batch_size: если батчинг, то 100-500
Дополнительные советы
1. Мониторинг памяти:
import tracemalloc
tracemalloc.start()
# ... код ...
current, peak = tracemalloc.get_traced_memory()
print(f"Текущая: {current / 1024 / 1024:.1f} MB, Пик: {peak / 1024 / 1024:.1f} MB")
2. Retry логика:
from aiohttp_retry import RetryClient
retry_client = RetryClient()
await retry_client.get(url)
3. Проксирование для распределения нагрузки.
Итоговые рекомендации
Для 10000 ссылок используй комбинированный подход: семафор (100-200) + правильные параметры коннектора + обработка ошибок. Это даст 3-5х ускорение против наивного подхода и не сломает целевой сервер.