← Назад к вопросам
Как обеспечиваешь отказоустойчивость в приложениях?
3.0 Senior🔥 131 комментариев
#DevOps и инфраструктура#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Обеспечение отказоустойчивости в приложениях
Отказоустойчивость (resilience) — это способность приложения продолжать работать при сбоях, восстанавливаться от ошибок и деградировать корректно. Это критически важно для production систем.
Основные паттерны
1. Retry логика с экспоненциальной задержкой
import asyncio
import random
from functools import wraps
from typing import Callable, Any
def retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""Декоратор для retry с экспоненциальной задержкой"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_retries - 1:
raise
# Вычисляем задержку
delay = base_delay * (exponential_base ** attempt)
delay = min(delay, max_delay)
# Добавляем jitter для предотвращения thundering herd
if jitter:
delay += random.uniform(0, delay * 0.1)
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
asyncio.run(asyncio.sleep(delay))
raise last_exception
return wrapper
return decorator
@retry(max_retries=3, base_delay=1.0)
def fetch_data(url: str):
"""Функция с автоматическим retry"""
# Может выбросить исключение
pass
2. Circuit Breaker паттерн
from enum import Enum
from datetime import datetime, timedelta
import threading
class CircuitState(Enum):
CLOSED = "closed" # Нормальное состояние
OPEN = "open" # Отклоняем все запросы
HALF_OPEN = "half_open" # Проверяем восстановление
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
"""Выполнить функцию через circuit breaker"""
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: HALF_OPEN - attempting recovery")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
print("Circuit breaker: CLOSED")
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker: OPEN (failures: {self.failure_count})")
def _should_attempt_reset(self) -> bool:
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout)
)
# Использование
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
def external_api_call():
# Может выбросить исключение
pass
try:
breaker.call(external_api_call)
except Exception as e:
print(f"Request failed: {e}")
3. Fallback и graceful degradation
from typing import Optional, Callable
class FallbackHandler:
def __init__(self, primary_func: Callable, fallback_func: Callable):
self.primary = primary_func
self.fallback = fallback_func
self.cache = {}
def execute(self, *args, **kwargs):
"""Попробовать primary, на ошибку использовать fallback"""
try:
result = self.primary(*args, **kwargs)
# Кэшируем успешный результат
cache_key = str(args) + str(kwargs)
self.cache[cache_key] = result
return result
except Exception as e:
print(f"Primary failed: {e}. Using fallback...")
# Проверяем кэш перед fallback
cache_key = str(args) + str(kwargs)
if cache_key in self.cache:
print(f"Returning cached result")
return self.cache[cache_key]
# Используем fallback функцию
return self.fallback(*args, **kwargs)
# Пример
def get_user_data_live(user_id: int):
# Обращаемся к свежим данным
pass
def get_user_data_cache(user_id: int):
# Возвращаем cached данные
pass
handler = FallbackHandler(get_user_data_live, get_user_data_cache)
user_data = handler.execute(user_id=123)
4. Timeout для всех операций
import asyncio
from functools import wraps
def timeout(seconds: float):
"""Декоратор timeout для async функций"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds
)
except asyncio.TimeoutError:
print(f"Operation timed out after {seconds}s")
raise
return wrapper
return decorator
@timeout(5.0)
async def fetch_data(url: str):
# Операция, которая должна завершиться за 5 секунд
pass
# Синхронный вариант
import signal
class TimeoutError(Exception):
pass
def sync_timeout(seconds: float):
def handler(signum, frame):
raise TimeoutError(f"Operation timed out after {seconds}s")
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, handler)
signal.alarm(int(seconds))
try:
return func(*args, **kwargs)
finally:
signal.alarm(0)
return wrapper
return decorator
5. Health checks и self-healing
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthCheck:
name: str
status: HealthStatus
last_checked: datetime
error_message: Optional[str] = None
class ApplicationHealthMonitor:
def __init__(self, check_interval: int = 30):
self.check_interval = check_interval
self.health_checks = {}
self.last_run = {}
def register_check(self, name: str, check_func: Callable):
"""Зарегистрировать health check"""
self.health_checks[name] = check_func
def run_checks(self) -> dict:
"""Выполнить все health checks"""
results = {}
for name, check_func in self.health_checks.items():
try:
is_healthy = check_func()
status = HealthStatus.HEALTHY if is_healthy else HealthStatus.DEGRADED
error_msg = None
except Exception as e:
status = HealthStatus.UNHEALTHY
error_msg = str(e)
results[name] = HealthCheck(
name=name,
status=status,
last_checked=datetime.now(),
error_message=error_msg
)
self.last_run[name] = datetime.now()
return results
def get_overall_status(self) -> HealthStatus:
"""Получить общий статус приложения"""
results = self.run_checks()
if any(r.status == HealthStatus.UNHEALTHY for r in results.values()):
return HealthStatus.UNHEALTHY
elif any(r.status == HealthStatus.DEGRADED for r in results.values()):
return HealthStatus.DEGRADED
else:
return HealthStatus.HEALTHY
# Использование
monitor = ApplicationHealthMonitor()
# Регистрируем checks
monitor.register_check('database', lambda: db.ping())
monitor.register_check('redis', lambda: redis_client.ping())
monitor.register_check('disk_space', lambda: check_disk_space())
overall_status = monitor.get_overall_status()
print(f"Application status: {overall_status.value}")
6. Bulkhead паттерн (изоляция ресурсов)
from concurrent.futures import ThreadPoolExecutor
import threading
class BulkheadExecutor:
def __init__(self, name: str, max_threads: int = 5):
self.name = name
self.executor = ThreadPoolExecutor(max_workers=max_threads)
self.semaphore = threading.Semaphore(max_threads)
def execute(self, func, *args, **kwargs):
"""Выполнить функцию с изоляцией ресурсов"""
if not self.semaphore.acquire(blocking=False):
raise Exception(f"Bulkhead {self.name} is at capacity")
try:
future = self.executor.submit(func, *args, **kwargs)
return future.result()
finally:
self.semaphore.release()
# Использование
payment_bulkhead = BulkheadExecutor('payment', max_threads=10)
notification_bulkhead = BulkheadExecutor('notification', max_threads=5)
# Платежи и уведомления используют разные потоки и не конкурируют
7. Rate limiting и backpressure
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def is_allowed(self) -> bool:
"""Проверить, разрешен ли запрос"""
now = time.time()
# Удаляем старые запросы за пределами окна
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def wait_if_needed(self):
"""Ждать, пока не будет разрешен запрос"""
while not self.is_allowed():
time.sleep(0.1)
# Использование
limiter = RateLimiter(max_requests=100, window_seconds=60)
for i in range(150):
limiter.wait_if_needed()
print(f"Request {i + 1}")
Best Practices для отказоустойчивости
- Используй multiple redundancy — несколько инстансов, БД реплики
- Автоматическое восстановление — самоизлечение, auto-restart
- Graceful degradation — fallback сервисы, кэширование
- Мониторинг и alerting — быстро обнаружить проблемы
- Load balancing — распределение нагрузки
- Database replication — с failover
- Message queues — асинхронность, буферизация
- Логирование и трейсинг — понимать что случилось
- Chaos engineering — тестировать отказы
Отказоустойчивость достигается комбинацией паттернов, мониторинга и тестирования.