← Назад к вопросам

Как обеспечиваешь отказоустойчивость в приложениях?

3.0 Senior🔥 131 комментариев
#DevOps и инфраструктура#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Обеспечение отказоустойчивости в приложениях

Отказоустойчивость (resilience) — это способность приложения продолжать работать при сбоях, восстанавливаться от ошибок и деградировать корректно. Это критически важно для production систем.

Основные паттерны

1. Retry логика с экспоненциальной задержкой

import asyncio
import random
from functools import wraps
from typing import Callable, Any

def retry(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """Декоратор для retry с экспоненциальной задержкой"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt == max_retries - 1:
                        raise
                    
                    # Вычисляем задержку
                    delay = base_delay * (exponential_base ** attempt)
                    delay = min(delay, max_delay)
                    
                    # Добавляем jitter для предотвращения thundering herd
                    if jitter:
                        delay += random.uniform(0, delay * 0.1)
                    
                    print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
                    asyncio.run(asyncio.sleep(delay))
            
            raise last_exception
        
        return wrapper
    return decorator

@retry(max_retries=3, base_delay=1.0)
def fetch_data(url: str):
    """Функция с автоматическим retry"""
    # Может выбросить исключение
    pass

2. Circuit Breaker паттерн

from enum import Enum
from datetime import datetime, timedelta
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # Нормальное состояние
    OPEN = "open"          # Отклоняем все запросы
    HALF_OPEN = "half_open"  # Проверяем восстановление

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: Exception = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.lock = threading.Lock()
    
    def call(self, func, *args, **kwargs):
        """Выполнить функцию через circuit breaker"""
        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    print("Circuit breaker: HALF_OPEN - attempting recovery")
                else:
                    raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
            print("Circuit breaker: CLOSED")
    
    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit breaker: OPEN (failures: {self.failure_count})")
    
    def _should_attempt_reset(self) -> bool:
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
        )

# Использование
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

def external_api_call():
    # Может выбросить исключение
    pass

try:
    breaker.call(external_api_call)
except Exception as e:
    print(f"Request failed: {e}")

3. Fallback и graceful degradation

from typing import Optional, Callable

class FallbackHandler:
    def __init__(self, primary_func: Callable, fallback_func: Callable):
        self.primary = primary_func
        self.fallback = fallback_func
        self.cache = {}
    
    def execute(self, *args, **kwargs):
        """Попробовать primary, на ошибку использовать fallback"""
        try:
            result = self.primary(*args, **kwargs)
            # Кэшируем успешный результат
            cache_key = str(args) + str(kwargs)
            self.cache[cache_key] = result
            return result
        except Exception as e:
            print(f"Primary failed: {e}. Using fallback...")
            
            # Проверяем кэш перед fallback
            cache_key = str(args) + str(kwargs)
            if cache_key in self.cache:
                print(f"Returning cached result")
                return self.cache[cache_key]
            
            # Используем fallback функцию
            return self.fallback(*args, **kwargs)

# Пример
def get_user_data_live(user_id: int):
    # Обращаемся к свежим данным
    pass

def get_user_data_cache(user_id: int):
    # Возвращаем cached данные
    pass

handler = FallbackHandler(get_user_data_live, get_user_data_cache)
user_data = handler.execute(user_id=123)

4. Timeout для всех операций

import asyncio
from functools import wraps

def timeout(seconds: float):
    """Декоратор timeout для async функций"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                print(f"Operation timed out after {seconds}s")
                raise
        return wrapper
    return decorator

@timeout(5.0)
async def fetch_data(url: str):
    # Операция, которая должна завершиться за 5 секунд
    pass

# Синхронный вариант
import signal

class TimeoutError(Exception):
    pass

def sync_timeout(seconds: float):
    def handler(signum, frame):
        raise TimeoutError(f"Operation timed out after {seconds}s")
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(int(seconds))
            try:
                return func(*args, **kwargs)
            finally:
                signal.alarm(0)
        return wrapper
    return decorator

5. Health checks и self-healing

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum

class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class HealthCheck:
    name: str
    status: HealthStatus
    last_checked: datetime
    error_message: Optional[str] = None

class ApplicationHealthMonitor:
    def __init__(self, check_interval: int = 30):
        self.check_interval = check_interval
        self.health_checks = {}
        self.last_run = {}
    
    def register_check(self, name: str, check_func: Callable):
        """Зарегистрировать health check"""
        self.health_checks[name] = check_func
    
    def run_checks(self) -> dict:
        """Выполнить все health checks"""
        results = {}
        
        for name, check_func in self.health_checks.items():
            try:
                is_healthy = check_func()
                status = HealthStatus.HEALTHY if is_healthy else HealthStatus.DEGRADED
                error_msg = None
            except Exception as e:
                status = HealthStatus.UNHEALTHY
                error_msg = str(e)
            
            results[name] = HealthCheck(
                name=name,
                status=status,
                last_checked=datetime.now(),
                error_message=error_msg
            )
            
            self.last_run[name] = datetime.now()
        
        return results
    
    def get_overall_status(self) -> HealthStatus:
        """Получить общий статус приложения"""
        results = self.run_checks()
        
        if any(r.status == HealthStatus.UNHEALTHY for r in results.values()):
            return HealthStatus.UNHEALTHY
        elif any(r.status == HealthStatus.DEGRADED for r in results.values()):
            return HealthStatus.DEGRADED
        else:
            return HealthStatus.HEALTHY

# Использование
monitor = ApplicationHealthMonitor()

# Регистрируем checks
monitor.register_check('database', lambda: db.ping())
monitor.register_check('redis', lambda: redis_client.ping())
monitor.register_check('disk_space', lambda: check_disk_space())

overall_status = monitor.get_overall_status()
print(f"Application status: {overall_status.value}")

6. Bulkhead паттерн (изоляция ресурсов)

from concurrent.futures import ThreadPoolExecutor
import threading

class BulkheadExecutor:
    def __init__(self, name: str, max_threads: int = 5):
        self.name = name
        self.executor = ThreadPoolExecutor(max_workers=max_threads)
        self.semaphore = threading.Semaphore(max_threads)
    
    def execute(self, func, *args, **kwargs):
        """Выполнить функцию с изоляцией ресурсов"""
        if not self.semaphore.acquire(blocking=False):
            raise Exception(f"Bulkhead {self.name} is at capacity")
        
        try:
            future = self.executor.submit(func, *args, **kwargs)
            return future.result()
        finally:
            self.semaphore.release()

# Использование
payment_bulkhead = BulkheadExecutor('payment', max_threads=10)
notification_bulkhead = BulkheadExecutor('notification', max_threads=5)

# Платежи и уведомления используют разные потоки и не конкурируют

7. Rate limiting и backpressure

import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
    
    def is_allowed(self) -> bool:
        """Проверить, разрешен ли запрос"""
        now = time.time()
        
        # Удаляем старые запросы за пределами окна
        while self.requests and self.requests[0] < now - self.window_seconds:
            self.requests.popleft()
        
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        
        return False
    
    def wait_if_needed(self):
        """Ждать, пока не будет разрешен запрос"""
        while not self.is_allowed():
            time.sleep(0.1)

# Использование
limiter = RateLimiter(max_requests=100, window_seconds=60)

for i in range(150):
    limiter.wait_if_needed()
    print(f"Request {i + 1}")

Best Practices для отказоустойчивости

  • Используй multiple redundancy — несколько инстансов, БД реплики
  • Автоматическое восстановление — самоизлечение, auto-restart
  • Graceful degradation — fallback сервисы, кэширование
  • Мониторинг и alerting — быстро обнаружить проблемы
  • Load balancing — распределение нагрузки
  • Database replication — с failover
  • Message queues — асинхронность, буферизация
  • Логирование и трейсинг — понимать что случилось
  • Chaos engineering — тестировать отказы

Отказоустойчивость достигается комбинацией паттернов, мониторинга и тестирования.