← Назад к вопросам

Как напишешь system design для веб-краулера?

3.0 Senior🔥 271 комментариев
#Машинное обучение

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

System Design для веб-краулера

Веб-краулер — сложная распределённая система для сбора данных в масштабе. Рассмотрим архитектуру для обработки миллионов страниц эффективно.

1. Высокоуровневая архитектура

Система состоит из нескольких ключевых компонентов:

  • URL Frontier: очередь для обхода, приоритизация, дедупликация
  • Crawlers: N параллельных воркеров для загрузки страниц
  • Parser: извлечение данных и ссылок из HTML
  • Storage: сохранение контента в БД
  • Content Index: индексирование для быстрого поиска

2. URL Frontier (управление очередью)

import redis
from urllib.parse import urlparse
import hashlib

class URLFrontier:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.seen_urls = set()
    
    def add_url(self, url: str, priority: int = 0) -> bool:
        """Добавить URL если его ещё не видели"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        
        if url_hash in self.seen_urls:
            return False
        
        self.seen_urls.add(url_hash)
        queue_key = self.get_queue_key(url)
        self.redis.lpush(queue_key, url)
        self.redis.zadd("url_by_priority", {url: -priority})
        
        return True
    
    def get_next_url(self) -> str:
        """Получить URL с наибольшим приоритетом"""
        urls = self.redis.zrange("url_by_priority", 0, 0)
        if not urls:
            return None
        
        url = urls[0]
        self.redis.zrem("url_by_priority", url)
        return url
    
    def get_queue_key(self, url: str) -> str:
        """Группировать по домену для политики вежливости"""
        domain = urlparse(url).netloc
        return f"queue:{domain}"

3. Crawler Worker

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import logging
from time import sleep

class CrawlerWorker:
    def __init__(self, url_frontier, content_db, politeness_delay=1.0):
        self.frontier = url_frontier
        self.content_db = content_db
        self.politeness_delay = politeness_delay
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "MyBot/1.0 (+http://mysite.com/bot)"
        })
    
    def fetch_url(self, url: str) -> str:
        """Загрузить содержимое с обработкой ошибок"""
        try:
            response = self.session.get(
                url, 
                timeout=10,
                allow_redirects=True
            )
            response.raise_for_status()
            return response.text
        except requests.exceptions.RequestException as e:
            logging.error(f"Error fetching {url}: {e}")
            return None
    
    def extract_links(self, html: str, base_url: str) -> list:
        """Извлечь ссылки из HTML"""
        soup = BeautifulSoup(html, "html.parser")
        links = []
        
        for link in soup.find_all("a", href=True):
            absolute_url = urljoin(base_url, link["href"])
            if absolute_url.startswith("http"):
                links.append(absolute_url)
        
        return links
    
    def crawl(self):
        """Основной цикл краулера"""
        while True:
            url = self.frontier.get_next_url()
            
            if not url:
                sleep(1)
                continue
            
            html = self.fetch_url(url)
            if not html:
                continue
            
            self.content_db.store(url, html)
            
            links = self.extract_links(html, url)
            for link in links:
                self.frontier.add_url(link)
            
            sleep(self.politeness_delay)

4. Content Storage (MongoDB)

import pymongo
from datetime import datetime

class ContentStorage:
    def __init__(self, mongodb_uri="mongodb://localhost:27017"):
        self.client = pymongo.MongoClient(mongodb_uri)
        self.db = self.client["crawler_db"]
        self.collection = self.db["pages"]
        
        self.collection.create_index("url", unique=True)
        self.collection.create_index("domain")
        self.collection.create_index("crawl_time")
    
    def store(self, url: str, html: str, metadata: dict = None):
        """Сохранить страницу"""
        doc = {
            "url": url,
            "html": html,
            "crawl_time": datetime.utcnow(),
            "metadata": metadata or {}
        }
        
        try:
            self.collection.insert_one(doc)
        except pymongo.errors.DuplicateKeyError:
            self.collection.update_one({"url": url}, {"\$set": doc})

5. Асинхронная обработка для масштабирования

import asyncio
from aiohttp import ClientSession

class AsyncCrawler:
    def __init__(self, url_frontier, content_db, max_concurrent=100):
        self.frontier = url_frontier
        self.content_db = content_db
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_url_async(self, url: str) -> tuple:
        """Асинхронная загрузка"""
        async with self.semaphore:
            try:
                async with ClientSession() as session:
                    async with session.get(url, timeout=10) as response:
                        if response.status == 200:
                            return url, await response.text()
            except Exception as e:
                logging.error(f"Error: {e}")
        
        return url, None
    
    async def crawl_batch(self, urls: list):
        """Обработать батч параллельно"""
        tasks = [self.fetch_url_async(url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for url, html in results:
            if html:
                self.content_db.store(url, html)

6. Дедупликация контента

import mmh3

class DuplicateDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def get_fingerprint(self, content: str) -> int:
        """Создать отпечаток контента"""
        return mmh3.hash(content)
    
    def is_duplicate(self, content: str) -> bool:
        """Проверить дублирование"""
        fingerprint = self.get_fingerprint(content)
        key = f"fp:{fingerprint}"
        
        if self.redis.exists(key):
            return True
        
        self.redis.setex(key, 7*24*3600, 1)
        return False

7. Ключевые рекомендации

Политика вежливости:

  • Соблюдай robots.txt
  • Задержка между запросами (1-2 сек)
  • Ограничение на количество страниц с домена в секунду

Надёжность:

  • Retry logic с exponential backoff
  • Monitoring (Prometheus + Grafana)
  • Checkpointing для восстановления

Производительность:

  • Распределённая очередь (Redis, RabbitMQ)
  • Асинхронная загрузка (asyncio, aiohttp)
  • Кэширование часто посещаемых страниц
  • Сжатие контента (gzip)

Масштабируемость:

  • Горизонтальное масштабирование воркеров
  • Использование распределённого кэша
  • Разделение домашних очередей по машинам
Как напишешь system design для веб-краулера? | PrepBro