Какие есть подходы к отказоустойчивости в системах?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Подходы к отказоустойчивости в системах
Отказоустойчивость (fault tolerance) — это способность системы продолжать работу при сбое компонентов. Это критично для production систем, где downtime стоит денег.
1. Redundancy (Избыточность)
Идея: Дублируем критичные компоненты, чтобы при отказе одного другой взял на себя нагрузку.
Пример 1: Database Replication
from sqlalchemy import create_engine
from contextlib import contextmanager
class DatabaseManager:
def __init__(self, primary_url, replica_url):
self.primary = create_engine(primary_url)
self.replica = create_engine(replica_url)
def get_connection(self, write=False):
engine = self.primary if write else self.replica
return engine.connect()
db_manager = DatabaseManager(
primary_url='postgresql://primary-host/db',
replica_url='postgresql://replica-host/db'
)
2. Graceful Degradation (Плавное ухудшение)
Идея: При отказе части системы она продолжает работу с ограниченной функциональностью.
class CacheService:
def __init__(self, cache, database):
self.cache = cache
self.database = database
def get_user(self, user_id: int):
try:
user = self.cache.get(f'user:{user_id}')
if user:
return user
except Exception as e:
print(f'Cache error: {e}, falling back to database')
user = self.database.get_user(user_id)
try:
self.cache.set(f'user:{user_id}', user, timeout=3600)
except:
pass
return user
3. Circuit Breaker Pattern
Идея: Если сервис постоянно падает, перестань его вызывать и вернись позже.
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.failure_count = 0
else:
raise Exception('Circuit is OPEN')
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
4. Retry Strategy (Повторные попытки)
Идея: Повтори операцию несколько раз с задержкой.
import time
import random
from typing import Callable, TypeVar
T = TypeVar('T')
class RetryStrategy:
def __init__(self, max_attempts=3, initial_delay=1, max_delay=60):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
attempt = 0
delay = self.initial_delay
while attempt < self.max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempt += 1
if attempt >= self.max_attempts:
raise
wait_time = min(delay, self.max_delay)
print(f'Attempt {attempt} failed, retrying in {wait_time:.1f}s')
time.sleep(wait_time)
delay *= 2
5. Timeout & Bulkhead (Изоляция)
Идея: Установи timeout и изолируй критичные ресурсы.
import asyncio
from concurrent.futures import ThreadPoolExecutor
class BulkheadService:
def __init__(self):
self.critical_executor = ThreadPoolExecutor(max_workers=10)
self.background_executor = ThreadPoolExecutor(max_workers=5)
async def process_with_timeout(self, func, timeout=5):
try:
result = await asyncio.wait_for(
asyncio.to_thread(func),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f'Operation timed out after {timeout}s')
raise TimeoutError(f'Operation exceeded {timeout}s')
6. Health Checks & Liveness Probes
Идея: Регулярно проверяй здоровье сервиса.
from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI()
class HealthChecker:
def __init__(self):
self.is_healthy = True
self.error_message = None
async def check_database(self):
try:
await database.execute('SELECT 1')
except Exception as e:
self.is_healthy = False
self.error_message = f'Database error: {e}'
async def run_health_checks(self):
while True:
self.is_healthy = True
self.error_message = None
await self.check_database()
await asyncio.sleep(10)
health_checker = HealthChecker()
@app.get('/health')
async def health_check():
if not health_checker.is_healthy:
raise HTTPException(status_code=503, detail=health_checker.error_message)
return {'status': 'healthy'}
7. Data Replication & Backup
Идея: Реплицируй данные географически.
class ReplicationManager:
def __init__(self, primary_db, secondary_db):
self.primary = primary_db
self.secondary = secondary_db
def write_with_replication(self, query, params):
self.primary.execute(query, params)
async def replicate():
try:
self.secondary.execute(query, params)
except Exception as e:
logger.error(f'Replication failed: {e}')
asyncio.create_task(replicate())
8. Chaos Engineering
Идея: Тестируй отказоустойчивость, специально создавая сбои.
import random
from functools import wraps
def inject_failure(failure_rate=0.1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if random.random() < failure_rate:
raise Exception(f'Chaos: {func.__name__} failed')
return func(*args, **kwargs)
return wrapper
return decorator
@inject_failure(failure_rate=0.1)
def external_api_call():
pass
9. Monitoring & Alerting
Идея: Обнаружь проблемы рано.
import time
class MonitoringService:
def __init__(self):
self.error_count = 0
self.last_error_time = None
def track_error(self, error_type: str, severity: str = 'warning'):
self.error_count += 1
self.last_error_time = time.time()
metrics.increment(f'errors.{error_type}')
if self.error_count > 10:
alert.send(f'High error rate: {self.error_count} errors')
Практические рекомендации
Tier 1 (обязательно):
- Database replication (основная + резервная)
- Health checks на каждый критичный сервис
- Graceful shutdown и connection pooling
- Basic retry логика
Tier 2 (рекомендуется):
- Circuit breaker для external APIs
- Cache с fallback
- Timeouts на все операции
- Bulkhead isolation
Tier 3 (продвинуто):
- Multi-region replication
- Chaos engineering тесты
- Advanced monitoring
- Auto-scaling и load balancing
Отказоустойчивость — это не одна фишка, а комбинация многих стратегий для надёжности system reliability!