← Назад к вопросам

Какие есть подходы к отказоустойчивости в системах?

2.7 Senior🔥 201 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Подходы к отказоустойчивости в системах

Отказоустойчивость (fault tolerance) — это способность системы продолжать работу при сбое компонентов. Это критично для production систем, где downtime стоит денег.

1. Redundancy (Избыточность)

Идея: Дублируем критичные компоненты, чтобы при отказе одного другой взял на себя нагрузку.

Пример 1: Database Replication

from sqlalchemy import create_engine
from contextlib import contextmanager

class DatabaseManager:
    def __init__(self, primary_url, replica_url):
        self.primary = create_engine(primary_url)
        self.replica = create_engine(replica_url)
    
    def get_connection(self, write=False):
        engine = self.primary if write else self.replica
        return engine.connect()

db_manager = DatabaseManager(
    primary_url='postgresql://primary-host/db',
    replica_url='postgresql://replica-host/db'
)

2. Graceful Degradation (Плавное ухудшение)

Идея: При отказе части системы она продолжает работу с ограниченной функциональностью.

class CacheService:
    def __init__(self, cache, database):
        self.cache = cache
        self.database = database
    
    def get_user(self, user_id: int):
        try:
            user = self.cache.get(f'user:{user_id}')
            if user:
                return user
        except Exception as e:
            print(f'Cache error: {e}, falling back to database')
        
        user = self.database.get_user(user_id)
        try:
            self.cache.set(f'user:{user_id}', user, timeout=3600)
        except:
            pass
        
        return user

3. Circuit Breaker Pattern

Идея: Если сервис постоянно падает, перестань его вызывать и вернись позже.

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = 'closed'
    OPEN = 'open'
    HALF_OPEN = 'half_open'

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                self.failure_count = 0
            else:
                raise Exception('Circuit is OPEN')
        
        try:
            result = func(*args, **kwargs)
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            raise

4. Retry Strategy (Повторные попытки)

Идея: Повтори операцию несколько раз с задержкой.

import time
import random
from typing import Callable, TypeVar

T = TypeVar('T')

class RetryStrategy:
    def __init__(self, max_attempts=3, initial_delay=1, max_delay=60):
        self.max_attempts = max_attempts
        self.initial_delay = initial_delay
        self.max_delay = max_delay
    
    def execute(self, func: Callable[..., T], *args, **kwargs) -> T:
        attempt = 0
        delay = self.initial_delay
        
        while attempt < self.max_attempts:
            try:
                return func(*args, **kwargs)
            except Exception as e:
                attempt += 1
                if attempt >= self.max_attempts:
                    raise
                wait_time = min(delay, self.max_delay)
                print(f'Attempt {attempt} failed, retrying in {wait_time:.1f}s')
                time.sleep(wait_time)
                delay *= 2

5. Timeout & Bulkhead (Изоляция)

Идея: Установи timeout и изолируй критичные ресурсы.

import asyncio
from concurrent.futures import ThreadPoolExecutor

class BulkheadService:
    def __init__(self):
        self.critical_executor = ThreadPoolExecutor(max_workers=10)
        self.background_executor = ThreadPoolExecutor(max_workers=5)
    
    async def process_with_timeout(self, func, timeout=5):
        try:
            result = await asyncio.wait_for(
                asyncio.to_thread(func),
                timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            print(f'Operation timed out after {timeout}s')
            raise TimeoutError(f'Operation exceeded {timeout}s')

6. Health Checks & Liveness Probes

Идея: Регулярно проверяй здоровье сервиса.

from fastapi import FastAPI, HTTPException
import asyncio

app = FastAPI()

class HealthChecker:
    def __init__(self):
        self.is_healthy = True
        self.error_message = None
    
    async def check_database(self):
        try:
            await database.execute('SELECT 1')
        except Exception as e:
            self.is_healthy = False
            self.error_message = f'Database error: {e}'
    
    async def run_health_checks(self):
        while True:
            self.is_healthy = True
            self.error_message = None
            await self.check_database()
            await asyncio.sleep(10)

health_checker = HealthChecker()

@app.get('/health')
async def health_check():
    if not health_checker.is_healthy:
        raise HTTPException(status_code=503, detail=health_checker.error_message)
    return {'status': 'healthy'}

7. Data Replication & Backup

Идея: Реплицируй данные географически.

class ReplicationManager:
    def __init__(self, primary_db, secondary_db):
        self.primary = primary_db
        self.secondary = secondary_db
    
    def write_with_replication(self, query, params):
        self.primary.execute(query, params)
        async def replicate():
            try:
                self.secondary.execute(query, params)
            except Exception as e:
                logger.error(f'Replication failed: {e}')
        asyncio.create_task(replicate())

8. Chaos Engineering

Идея: Тестируй отказоустойчивость, специально создавая сбои.

import random
from functools import wraps

def inject_failure(failure_rate=0.1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if random.random() < failure_rate:
                raise Exception(f'Chaos: {func.__name__} failed')
            return func(*args, **kwargs)
        return wrapper
    return decorator

@inject_failure(failure_rate=0.1)
def external_api_call():
    pass

9. Monitoring & Alerting

Идея: Обнаружь проблемы рано.

import time

class MonitoringService:
    def __init__(self):
        self.error_count = 0
        self.last_error_time = None
    
    def track_error(self, error_type: str, severity: str = 'warning'):
        self.error_count += 1
        self.last_error_time = time.time()
        metrics.increment(f'errors.{error_type}')
        if self.error_count > 10:
            alert.send(f'High error rate: {self.error_count} errors')

Практические рекомендации

Tier 1 (обязательно):

  • Database replication (основная + резервная)
  • Health checks на каждый критичный сервис
  • Graceful shutdown и connection pooling
  • Basic retry логика

Tier 2 (рекомендуется):

  • Circuit breaker для external APIs
  • Cache с fallback
  • Timeouts на все операции
  • Bulkhead isolation

Tier 3 (продвинуто):

  • Multi-region replication
  • Chaos engineering тесты
  • Advanced monitoring
  • Auto-scaling и load balancing

Отказоустойчивость — это не одна фишка, а комбинация многих стратегий для надёжности system reliability!

Какие есть подходы к отказоустойчивости в системах? | PrepBro