← Назад к вопросам

Делал ли локи в Redis

1.7 Middle🔥 151 комментариев
#Архитектура и паттерны#Базы данных (NoSQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Делал ли локи в Redis

Да, я активно работал с распределёнными блокировками (локами) в Redis. Это одна из критичных частей при разработке высоконагруженных систем. Рассмотрю подходы, которые я использовал.

1. Простой подход с SET NX

Самая базовая реализация:

import redis
import uuid
from datetime import datetime, timedelta

redis_client = redis.Redis(host="localhost", port=6379)

def acquire_lock(key: str, ttl: int = 30) -> str | None:
    """Попытка получить лок"""
    lock_id = str(uuid.uuid4())
    
    # SET key value NX EX ttl — атомарная операция
    acquired = redis_client.set(
        name=key,
        value=lock_id,
        nx=True,  # Только если ключ не существует
        ex=ttl    # Время жизни в секундах
    )
    
    return lock_id if acquired else None

def release_lock(key: str, lock_id: str) -> bool:
    """Освобождение лока с проверкой владельца"""
    # Нужна Lua скрипт для атомарности
    script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    
    result = redis_client.eval(script, 1, key, lock_id)
    return bool(result)

Проблемы:

  • Нет автоматического продления TTL
  • Процесс может умереть, держа лок
  • Нет wait-и-retry логики

2. Redlock алгоритм (распределённые системы)

Для критичных операций использовал Redlock Антиреза:

import redis
from redlock import Redlock
import time

# Несколько Redis инстансов для надёжности
redis_instances = [
    redis.Redis(host="redis1", port=6379),
    redis.Redis(host="redis2", port=6379),
    redis.Redis(host="redis3", port=6379),
]

dlm = Redlock(key="my-resource", auto_renewal=True)

with dlm:
    # Критичная операция
    perform_critical_operation()

Преимущества:

  • Работает при сбое одного Redis узла
  • Минимум 3 узлов для надёжности
  • Гарантирует эксклюзивный доступ

3. Семафоры (рейт-лимитинг)

Для ограничения параллельных операций:

class RateLimiter:
    def __init__(self, redis_client, max_concurrent: int, ttl: int = 60):
        self.redis = redis_client
        self.max_concurrent = max_concurrent
        self.ttl = ttl
    
    def acquire(self, resource_id: str) -> bool:
        """Попытка занять слот в семафоре"""
        key = f"semaphore:{resource_id}"
        
        # Инкрементируем счётчик активных операций
        current = self.redis.incr(key)
        if current == 1:
            self.redis.expire(key, self.ttl)
        
        return current <= self.max_concurrent
    
    def release(self, resource_id: str):
        """Освобождение слота"""
        key = f"semaphore:{resource_id}"
        self.redis.decr(key)

# Использование
limiter = RateLimiter(redis_client, max_concurrent=5)

if limiter.acquire("api_endpoint"):
    try:
        # Выполнение операции
        process_request()
    finally:
        limiter.release("api_endpoint")

4. Mutex с автоматическим продлением

Для долгоживущих операций использовал background refresh:

import threading
import uuid
from typing import Generator
from contextlib import contextmanager

class MutexLock:
    def __init__(self, redis_client, key: str, ttl: int = 30):
        self.redis = redis_client
        self.key = key
        self.ttl = ttl
        self.lock_id = str(uuid.uuid4())
        self.refresh_thread = None
        self._stop_refresh = threading.Event()
    
    def acquire(self) -> bool:
        """Попытка получить лок"""
        acquired = self.redis.set(
            self.key,
            self.lock_id,
            nx=True,
            ex=self.ttl
        )
        
        if acquired:
            # Запуск потока для автоматического продления
            self._start_refresh()
        
        return bool(acquired)
    
    def _start_refresh(self):
        """Фоновое продление TTL лока"""
        def refresh_loop():
            while not self._stop_refresh.is_set():
                time.sleep(self.ttl // 2)  # Продляем посередине TTL
                
                # Проверяем, что мы ещё владельцы лока
                if self.redis.get(self.key) == self.lock_id:
                    self.redis.expire(self.key, self.ttl)
        
        self.refresh_thread = threading.Thread(target=refresh_loop, daemon=True)
        self.refresh_thread.start()
    
    def release(self):
        """Освобождение лока"""
        self._stop_refresh.set()
        
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(script, 1, self.key, self.lock_id)
    
    @contextmanager
    def lock(self):
        if not self.acquire():
            raise TimeoutError(f"Cannot acquire lock on {self.key}")
        try:
            yield
        finally:
            self.release()

# Использование
lock = MutexLock(redis_client, "payment_processing", ttl=30)
with lock.lock():
    process_payment()  # Может выполняться дольше 30 секунд

5. Очередь с блокировкой (Pub/Sub)

Для ожидания события освобождения лока:

from typing import Optional

class WaitingLock:
    def __init__(self, redis_client, key: str, ttl: int = 30, timeout: int = 5):
        self.redis = redis_client
        self.key = key
        self.ttl = ttl
        self.timeout = timeout
    
    def acquire_with_wait(self, max_attempts: int = 3) -> str | None:
        """Получить лок с попытками ожидания"""
        lock_id = str(uuid.uuid4())
        
        for attempt in range(max_attempts):
            # Попытка захватить лок
            if self.redis.set(self.key, lock_id, nx=True, ex=self.ttl):
                return lock_id
            
            # Если не получилось, ждём события освобождения
            pubsub = self.redis.pubsub()
            pubsub.subscribe(f"{self.key}:released")
            
            try:
                message = pubsub.get_message(timeout=self.timeout)
                if message and message["type"] == "message":
                    continue  # Попробуем ещё раз
            finally:
                pubsub.unsubscribe()
        
        return None  # Не удалось получить лок
    
    def release(self, lock_id: str):
        """Освобождение лока с уведомлением"""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            redis.call("del", KEYS[1])
            redis.call("publish", KEYS[1] .. ":released", "1")
            return 1
        else
            return 0
        end
        """
        self.redis.eval(script, 1, self.key, lock_id)

6. Проблемы и решения в реальных проектах

Проблема: Deadlock при исключениях

# ❌ Плохо — лок не освобождается при ошибке
lock_id = acquire_lock("resource")
perform_operation()  # Если исключение — лок зависит
release_lock("resource", lock_id)

# ✅ Хорошо — гарантированное освобождение
try:
    lock_id = acquire_lock("resource", ttl=30)  # TTL спасает
    perform_operation()
finally:
    release_lock("resource", lock_id)

Проблема: Race condition между check и set

# ❌ Плохо — два потока могут одновременно захватить
if not redis.exists(key):
    redis.set(key, lock_id)

# ✅ Хорошо — атомарная операция
redis.set(key, lock_id, nx=True, ex=30)

Проблема: Потеря данных при рестарте Redis

# Решение: использовать Redis Cluster или AOF (Append-Only File)
# redis.conf: appendonly yes

Итоги лучших практик

  1. Всегда используй TTL — защита от deadlock
  2. Атомарные операции — SET NX EX, Lua скрипты
  3. Проверка владельца — уникальный lock_id
  4. Context managers — гарантированное освобождение
  5. Для критичных систем — Redlock с несколькими узлами
  6. Мониторинг — отслеживание зависших локов

В своих проектах использовал Redis для синхронизации процессов обработки платежей, кэширования вычислений и рейт-лимитинга API. Это надёжное решение для современных микросервисных архитектур.

Делал ли локи в Redis | PrepBro