← Назад к вопросам
Какие методы используются для защиты от DoS-атак?
2.7 Senior🔥 121 комментариев
#Безопасность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Защита от DoS-атак
DoS (Denial of Service) — атака, которая делает сервис недоступным. Рассмотрю методы защиты.
1. Rate Limiting
Ограничение количества запросов за период времени.
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get('/api/data')
@limiter.limit('100/minute') # максимум 100 запросов в минуту
async def get_data(request: Request):
return {'data': 'value'}
@app.post('/login')
@limiter.limit('5/minute') # строже для login
async def login(request: Request):
return {'token': 'abc123'}
Также можно использовать Redis для распределённого rate limiting:
import redis
import time
class RedisRateLimiter:
def __init__(self, redis_client, max_requests: int = 100, window: int = 60):
self.redis = redis_client
self.max_requests = max_requests
self.window = window
def is_allowed(self, client_id: str) -> bool:
key = f'rate_limit:{client_id}'
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, self.window) # TTL
return current <= self.max_requests
# Использование
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisRateLimiter(redis_client, max_requests=100, window=60)
if not limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail='Too Many Requests')
2. CAPTCHA
Проверка что запрос от реального человека.
import httpx
from fastapi import Form, HTTPException
class GoogleRecaptchaValidator:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.verify_url = 'https://www.google.com/recaptcha/api/siteverify'
async def verify(self, token: str) -> bool:
async with httpx.AsyncClient() as client:
response = await client.post(
self.verify_url,
data={
'secret': self.secret_key,
'response': token
}
)
result = response.json()
return result.get('success', False) and result.get('score', 0) > 0.5
@app.post('/register')
async def register(
email: str = Form(...),
password: str = Form(...),
recaptcha_token: str = Form(...)
):
validator = GoogleRecaptchaValidator(RECAPTCHA_SECRET)
if not await validator.verify(recaptcha_token):
raise HTTPException(status_code=400, detail='reCAPTCHA verification failed')
# Создать пользователя
return {'status': 'registered'}
3. WAF (Web Application Firewall)
Фильтрация опасных запросов на уровне приложения.
from fastapi import FastAPI, Request
import re
class SimpleWAF:
def __init__(self):
# Паттерны для обнаружения атак
self.sql_injection_patterns = [
r"('|(\-\-)|(;)|(\*))" # SQL символы
]
self.xss_patterns = [
r'<script[^>]*>',
r'javascript:',
r'on\w+\s*='
]
self.path_traversal_patterns = [
r'\.\./|\.\\\\'
]
def is_safe(self, request_data: str) -> bool:
all_patterns = (
self.sql_injection_patterns +
self.xss_patterns +
self.path_traversal_patterns
)
for pattern in all_patterns:
if re.search(pattern, request_data, re.IGNORECASE):
return False
return True
waf = SimpleWAF()
@app.middleware('http')
async def waf_middleware(request: Request, call_next):
# Проверить query параметры
for value in request.query_params.values():
if not waf.is_safe(value):
raise HTTPException(status_code=400, detail='Malicious request')
return await call_next(request)
4. DDoS Protection на инфраструктуре
Использовать облачные сервисы для защиты:
# Используются Cloudflare, AWS Shield, Akamai и т.д.
# На уровне nginx
# nginx.conf
http {
# Ограничение скорости подключения
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=5r/s;
# Ограничение размера буфера
client_body_buffer_size 128k;
client_max_body_size 10m;
# Timeout
client_body_timeout 10s;
client_header_timeout 10s;
keepalive_timeout 5s 5s;
server {
listen 80;
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://backend;
}
location /login {
limit_req zone=login burst=5 nodelay;
proxy_pass http://backend;
}
}
}
5. Connection Limits
Ограничение числа одновременных соединений.
from asyncio import Semaphore
from fastapi import FastAPI
class ConnectionLimiter:
def __init__(self, max_connections: int = 1000):
self.semaphore = Semaphore(max_connections)
async def acquire(self):
return await self.semaphore.acquire()
def release(self):
self.semaphore.release()
limiter = ConnectionLimiter(max_connections=1000)
@app.middleware('http')
async def connection_limiter_middleware(request: Request, call_next):
if not await limiter.acquire():
raise HTTPException(status_code=503, detail='Service Unavailable')
try:
response = await call_next(request)
return response
finally:
limiter.release()
6. Кеширование
Redis для уменьшения нагрузки на основной сервис.
import redis
from fastapi import FastAPI
import json
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
@app.get('/api/users/{user_id}')
async def get_user(user_id: int):
# Проверить кеш
cache_key = f'user:{user_id}'
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Получить из БД
user = await db.get_user(user_id)
# Закешировать
redis_client.setex(
cache_key,
3600, # TTL 1 час
json.dumps(user)
)
return user
7. IP Whitelisting/Blacklisting
Управление доступом по IP адресам.
from fastapi import Request, HTTPException
class IPFilter:
def __init__(self, whitelist: list[str] = None, blacklist: list[str] = None):
self.whitelist = whitelist or []
self.blacklist = blacklist or []
def is_allowed(self, client_ip: str) -> bool:
if self.whitelist and client_ip not in self.whitelist:
return False
if self.blacklist and client_ip in self.blacklist:
return False
return True
ip_filter = IPFilter(
whitelist=['192.168.1.0/24'],
blacklist=['10.0.0.5']
)
@app.middleware('http')
async def ip_filter_middleware(request: Request, call_next):
client_ip = request.client.host
if not ip_filter.is_allowed(client_ip):
raise HTTPException(status_code=403, detail='Access denied')
return await call_next(request)
8. Мониторинг и Алерты
Отслеживание аномалий в трафике.
import asyncio
from datetime import datetime, timedelta
class TrafficMonitor:
def __init__(self, alert_threshold: int = 1000):
self.request_count = 0
self.last_check = datetime.now()
self.alert_threshold = alert_threshold
async def check_traffic(self):
while True:
await asyncio.sleep(60) # проверка каждую минуту
now = datetime.now()
elapsed = (now - self.last_check).total_seconds()
if self.request_count / elapsed > self.alert_threshold:
# отправить алерт
print(f'DoS Attack detected! Requests: {self.request_count/elapsed:.0f}/sec')
# отправить уведомление в Slack, email и т.д.
self.request_count = 0
self.last_check = now
def record_request(self):
self.request_count += 1
Комплексная защита
- Rate limiting на приложении + инфраструктуре
- CAPTCHA для чувствительных эндпоинтов
- WAF для фильтрации
- DDoS Protection (Cloudflare, AWS Shield)
- Connection limits
- Кеширование
- IP filtering
- Мониторинг 24/7
Нет одного решения — комбинация методов обеспечивает надёжную защиту.