← Назад к вопросам

Какие методы используются для защиты от DoS-атак?

2.7 Senior🔥 121 комментариев
#Безопасность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Защита от DoS-атак

DoS (Denial of Service) — атака, которая делает сервис недоступным. Рассмотрю методы защиты.

1. Rate Limiting

Ограничение количества запросов за период времени.

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get('/api/data')
@limiter.limit('100/minute')  # максимум 100 запросов в минуту
async def get_data(request: Request):
    return {'data': 'value'}

@app.post('/login')
@limiter.limit('5/minute')  # строже для login
async def login(request: Request):
    return {'token': 'abc123'}

Также можно использовать Redis для распределённого rate limiting:

import redis
import time

class RedisRateLimiter:
    def __init__(self, redis_client, max_requests: int = 100, window: int = 60):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window = window
    
    def is_allowed(self, client_id: str) -> bool:
        key = f'rate_limit:{client_id}'
        current = self.redis.incr(key)
        
        if current == 1:
            self.redis.expire(key, self.window)  # TTL
        
        return current <= self.max_requests

# Использование
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisRateLimiter(redis_client, max_requests=100, window=60)

if not limiter.is_allowed(client_ip):
    raise HTTPException(status_code=429, detail='Too Many Requests')

2. CAPTCHA

Проверка что запрос от реального человека.

import httpx
from fastapi import Form, HTTPException

class GoogleRecaptchaValidator:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.verify_url = 'https://www.google.com/recaptcha/api/siteverify'
    
    async def verify(self, token: str) -> bool:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.verify_url,
                data={
                    'secret': self.secret_key,
                    'response': token
                }
            )
            result = response.json()
            return result.get('success', False) and result.get('score', 0) > 0.5

@app.post('/register')
async def register(
    email: str = Form(...),
    password: str = Form(...),
    recaptcha_token: str = Form(...)
):
    validator = GoogleRecaptchaValidator(RECAPTCHA_SECRET)
    if not await validator.verify(recaptcha_token):
        raise HTTPException(status_code=400, detail='reCAPTCHA verification failed')
    
    # Создать пользователя
    return {'status': 'registered'}

3. WAF (Web Application Firewall)

Фильтрация опасных запросов на уровне приложения.

from fastapi import FastAPI, Request
import re

class SimpleWAF:
    def __init__(self):
        # Паттерны для обнаружения атак
        self.sql_injection_patterns = [
            r"('|(\-\-)|(;)|(\*))"  # SQL символы
        ]
        self.xss_patterns = [
            r'<script[^>]*>',
            r'javascript:',
            r'on\w+\s*='
        ]
        self.path_traversal_patterns = [
            r'\.\./|\.\\\\'
        ]
    
    def is_safe(self, request_data: str) -> bool:
        all_patterns = (
            self.sql_injection_patterns +
            self.xss_patterns +
            self.path_traversal_patterns
        )
        
        for pattern in all_patterns:
            if re.search(pattern, request_data, re.IGNORECASE):
                return False
        return True

waf = SimpleWAF()

@app.middleware('http')
async def waf_middleware(request: Request, call_next):
    # Проверить query параметры
    for value in request.query_params.values():
        if not waf.is_safe(value):
            raise HTTPException(status_code=400, detail='Malicious request')
    
    return await call_next(request)

4. DDoS Protection на инфраструктуре

Использовать облачные сервисы для защиты:

# Используются Cloudflare, AWS Shield, Akamai и т.д.
# На уровне nginx

# nginx.conf
http {
    # Ограничение скорости подключения
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req_zone $binary_remote_addr zone=login:10m rate=5r/s;
    
    # Ограничение размера буфера
    client_body_buffer_size 128k;
    client_max_body_size 10m;
    
    # Timeout
    client_body_timeout 10s;
    client_header_timeout 10s;
    keepalive_timeout 5s 5s;
    
    server {
        listen 80;
        
        location /api/ {
            limit_req zone=api burst=20 nodelay;
            proxy_pass http://backend;
        }
        
        location /login {
            limit_req zone=login burst=5 nodelay;
            proxy_pass http://backend;
        }
    }
}

5. Connection Limits

Ограничение числа одновременных соединений.

from asyncio import Semaphore
from fastapi import FastAPI

class ConnectionLimiter:
    def __init__(self, max_connections: int = 1000):
        self.semaphore = Semaphore(max_connections)
    
    async def acquire(self):
        return await self.semaphore.acquire()
    
    def release(self):
        self.semaphore.release()

limiter = ConnectionLimiter(max_connections=1000)

@app.middleware('http')
async def connection_limiter_middleware(request: Request, call_next):
    if not await limiter.acquire():
        raise HTTPException(status_code=503, detail='Service Unavailable')
    
    try:
        response = await call_next(request)
        return response
    finally:
        limiter.release()

6. Кеширование

Redis для уменьшения нагрузки на основной сервис.

import redis
from fastapi import FastAPI
import json

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

@app.get('/api/users/{user_id}')
async def get_user(user_id: int):
    # Проверить кеш
    cache_key = f'user:{user_id}'
    cached = redis_client.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # Получить из БД
    user = await db.get_user(user_id)
    
    # Закешировать
    redis_client.setex(
        cache_key,
        3600,  # TTL 1 час
        json.dumps(user)
    )
    
    return user

7. IP Whitelisting/Blacklisting

Управление доступом по IP адресам.

from fastapi import Request, HTTPException

class IPFilter:
    def __init__(self, whitelist: list[str] = None, blacklist: list[str] = None):
        self.whitelist = whitelist or []
        self.blacklist = blacklist or []
    
    def is_allowed(self, client_ip: str) -> bool:
        if self.whitelist and client_ip not in self.whitelist:
            return False
        if self.blacklist and client_ip in self.blacklist:
            return False
        return True

ip_filter = IPFilter(
    whitelist=['192.168.1.0/24'],
    blacklist=['10.0.0.5']
)

@app.middleware('http')
async def ip_filter_middleware(request: Request, call_next):
    client_ip = request.client.host
    if not ip_filter.is_allowed(client_ip):
        raise HTTPException(status_code=403, detail='Access denied')
    return await call_next(request)

8. Мониторинг и Алерты

Отслеживание аномалий в трафике.

import asyncio
from datetime import datetime, timedelta

class TrafficMonitor:
    def __init__(self, alert_threshold: int = 1000):
        self.request_count = 0
        self.last_check = datetime.now()
        self.alert_threshold = alert_threshold
    
    async def check_traffic(self):
        while True:
            await asyncio.sleep(60)  # проверка каждую минуту
            
            now = datetime.now()
            elapsed = (now - self.last_check).total_seconds()
            
            if self.request_count / elapsed > self.alert_threshold:
                # отправить алерт
                print(f'DoS Attack detected! Requests: {self.request_count/elapsed:.0f}/sec')
                # отправить уведомление в Slack, email и т.д.
            
            self.request_count = 0
            self.last_check = now
    
    def record_request(self):
        self.request_count += 1

Комплексная защита

  • Rate limiting на приложении + инфраструктуре
  • CAPTCHA для чувствительных эндпоинтов
  • WAF для фильтрации
  • DDoS Protection (Cloudflare, AWS Shield)
  • Connection limits
  • Кеширование
  • IP filtering
  • Мониторинг 24/7

Нет одного решения — комбинация методов обеспечивает надёжную защиту.

Какие методы используются для защиты от DoS-атак? | PrepBro