← Назад к вопросам
Как напишешь system design для веб-краулера?
3.0 Senior🔥 271 комментариев
#Машинное обучение
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
System Design для веб-краулера
Веб-краулер — сложная распределённая система для сбора данных в масштабе. Рассмотрим архитектуру для обработки миллионов страниц эффективно.
1. Высокоуровневая архитектура
Система состоит из нескольких ключевых компонентов:
- URL Frontier: очередь для обхода, приоритизация, дедупликация
- Crawlers: N параллельных воркеров для загрузки страниц
- Parser: извлечение данных и ссылок из HTML
- Storage: сохранение контента в БД
- Content Index: индексирование для быстрого поиска
2. URL Frontier (управление очередью)
import redis
from urllib.parse import urlparse
import hashlib
class URLFrontier:
def __init__(self, redis_client):
self.redis = redis_client
self.seen_urls = set()
def add_url(self, url: str, priority: int = 0) -> bool:
"""Добавить URL если его ещё не видели"""
url_hash = hashlib.md5(url.encode()).hexdigest()
if url_hash in self.seen_urls:
return False
self.seen_urls.add(url_hash)
queue_key = self.get_queue_key(url)
self.redis.lpush(queue_key, url)
self.redis.zadd("url_by_priority", {url: -priority})
return True
def get_next_url(self) -> str:
"""Получить URL с наибольшим приоритетом"""
urls = self.redis.zrange("url_by_priority", 0, 0)
if not urls:
return None
url = urls[0]
self.redis.zrem("url_by_priority", url)
return url
def get_queue_key(self, url: str) -> str:
"""Группировать по домену для политики вежливости"""
domain = urlparse(url).netloc
return f"queue:{domain}"
3. Crawler Worker
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import logging
from time import sleep
class CrawlerWorker:
def __init__(self, url_frontier, content_db, politeness_delay=1.0):
self.frontier = url_frontier
self.content_db = content_db
self.politeness_delay = politeness_delay
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "MyBot/1.0 (+http://mysite.com/bot)"
})
def fetch_url(self, url: str) -> str:
"""Загрузить содержимое с обработкой ошибок"""
try:
response = self.session.get(
url,
timeout=10,
allow_redirects=True
)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return None
def extract_links(self, html: str, base_url: str) -> list:
"""Извлечь ссылки из HTML"""
soup = BeautifulSoup(html, "html.parser")
links = []
for link in soup.find_all("a", href=True):
absolute_url = urljoin(base_url, link["href"])
if absolute_url.startswith("http"):
links.append(absolute_url)
return links
def crawl(self):
"""Основной цикл краулера"""
while True:
url = self.frontier.get_next_url()
if not url:
sleep(1)
continue
html = self.fetch_url(url)
if not html:
continue
self.content_db.store(url, html)
links = self.extract_links(html, url)
for link in links:
self.frontier.add_url(link)
sleep(self.politeness_delay)
4. Content Storage (MongoDB)
import pymongo
from datetime import datetime
class ContentStorage:
def __init__(self, mongodb_uri="mongodb://localhost:27017"):
self.client = pymongo.MongoClient(mongodb_uri)
self.db = self.client["crawler_db"]
self.collection = self.db["pages"]
self.collection.create_index("url", unique=True)
self.collection.create_index("domain")
self.collection.create_index("crawl_time")
def store(self, url: str, html: str, metadata: dict = None):
"""Сохранить страницу"""
doc = {
"url": url,
"html": html,
"crawl_time": datetime.utcnow(),
"metadata": metadata or {}
}
try:
self.collection.insert_one(doc)
except pymongo.errors.DuplicateKeyError:
self.collection.update_one({"url": url}, {"\$set": doc})
5. Асинхронная обработка для масштабирования
import asyncio
from aiohttp import ClientSession
class AsyncCrawler:
def __init__(self, url_frontier, content_db, max_concurrent=100):
self.frontier = url_frontier
self.content_db = content_db
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_url_async(self, url: str) -> tuple:
"""Асинхронная загрузка"""
async with self.semaphore:
try:
async with ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return url, await response.text()
except Exception as e:
logging.error(f"Error: {e}")
return url, None
async def crawl_batch(self, urls: list):
"""Обработать батч параллельно"""
tasks = [self.fetch_url_async(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, html in results:
if html:
self.content_db.store(url, html)
6. Дедупликация контента
import mmh3
class DuplicateDetector:
def __init__(self, redis_client):
self.redis = redis_client
def get_fingerprint(self, content: str) -> int:
"""Создать отпечаток контента"""
return mmh3.hash(content)
def is_duplicate(self, content: str) -> bool:
"""Проверить дублирование"""
fingerprint = self.get_fingerprint(content)
key = f"fp:{fingerprint}"
if self.redis.exists(key):
return True
self.redis.setex(key, 7*24*3600, 1)
return False
7. Ключевые рекомендации
Политика вежливости:
- Соблюдай robots.txt
- Задержка между запросами (1-2 сек)
- Ограничение на количество страниц с домена в секунду
Надёжность:
- Retry logic с exponential backoff
- Monitoring (Prometheus + Grafana)
- Checkpointing для восстановления
Производительность:
- Распределённая очередь (Redis, RabbitMQ)
- Асинхронная загрузка (asyncio, aiohttp)
- Кэширование часто посещаемых страниц
- Сжатие контента (gzip)
Масштабируемость:
- Горизонтальное масштабирование воркеров
- Использование распределённого кэша
- Разделение домашних очередей по машинам