Делал ли локи в Redis
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Делал ли локи в Redis
Да, я активно работал с распределёнными блокировками (локами) в Redis. Это одна из критичных частей при разработке высоконагруженных систем. Рассмотрю подходы, которые я использовал.
1. Простой подход с SET NX
Самая базовая реализация:
import redis
import uuid
from datetime import datetime, timedelta
redis_client = redis.Redis(host="localhost", port=6379)
def acquire_lock(key: str, ttl: int = 30) -> str | None:
"""Попытка получить лок"""
lock_id = str(uuid.uuid4())
# SET key value NX EX ttl — атомарная операция
acquired = redis_client.set(
name=key,
value=lock_id,
nx=True, # Только если ключ не существует
ex=ttl # Время жизни в секундах
)
return lock_id if acquired else None
def release_lock(key: str, lock_id: str) -> bool:
"""Освобождение лока с проверкой владельца"""
# Нужна Lua скрипт для атомарности
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = redis_client.eval(script, 1, key, lock_id)
return bool(result)
Проблемы:
- Нет автоматического продления TTL
- Процесс может умереть, держа лок
- Нет wait-и-retry логики
2. Redlock алгоритм (распределённые системы)
Для критичных операций использовал Redlock Антиреза:
import redis
from redlock import Redlock
import time
# Несколько Redis инстансов для надёжности
redis_instances = [
redis.Redis(host="redis1", port=6379),
redis.Redis(host="redis2", port=6379),
redis.Redis(host="redis3", port=6379),
]
dlm = Redlock(key="my-resource", auto_renewal=True)
with dlm:
# Критичная операция
perform_critical_operation()
Преимущества:
- Работает при сбое одного Redis узла
- Минимум 3 узлов для надёжности
- Гарантирует эксклюзивный доступ
3. Семафоры (рейт-лимитинг)
Для ограничения параллельных операций:
class RateLimiter:
def __init__(self, redis_client, max_concurrent: int, ttl: int = 60):
self.redis = redis_client
self.max_concurrent = max_concurrent
self.ttl = ttl
def acquire(self, resource_id: str) -> bool:
"""Попытка занять слот в семафоре"""
key = f"semaphore:{resource_id}"
# Инкрементируем счётчик активных операций
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, self.ttl)
return current <= self.max_concurrent
def release(self, resource_id: str):
"""Освобождение слота"""
key = f"semaphore:{resource_id}"
self.redis.decr(key)
# Использование
limiter = RateLimiter(redis_client, max_concurrent=5)
if limiter.acquire("api_endpoint"):
try:
# Выполнение операции
process_request()
finally:
limiter.release("api_endpoint")
4. Mutex с автоматическим продлением
Для долгоживущих операций использовал background refresh:
import threading
import uuid
from typing import Generator
from contextlib import contextmanager
class MutexLock:
def __init__(self, redis_client, key: str, ttl: int = 30):
self.redis = redis_client
self.key = key
self.ttl = ttl
self.lock_id = str(uuid.uuid4())
self.refresh_thread = None
self._stop_refresh = threading.Event()
def acquire(self) -> bool:
"""Попытка получить лок"""
acquired = self.redis.set(
self.key,
self.lock_id,
nx=True,
ex=self.ttl
)
if acquired:
# Запуск потока для автоматического продления
self._start_refresh()
return bool(acquired)
def _start_refresh(self):
"""Фоновое продление TTL лока"""
def refresh_loop():
while not self._stop_refresh.is_set():
time.sleep(self.ttl // 2) # Продляем посередине TTL
# Проверяем, что мы ещё владельцы лока
if self.redis.get(self.key) == self.lock_id:
self.redis.expire(self.key, self.ttl)
self.refresh_thread = threading.Thread(target=refresh_loop, daemon=True)
self.refresh_thread.start()
def release(self):
"""Освобождение лока"""
self._stop_refresh.set()
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.key, self.lock_id)
@contextmanager
def lock(self):
if not self.acquire():
raise TimeoutError(f"Cannot acquire lock on {self.key}")
try:
yield
finally:
self.release()
# Использование
lock = MutexLock(redis_client, "payment_processing", ttl=30)
with lock.lock():
process_payment() # Может выполняться дольше 30 секунд
5. Очередь с блокировкой (Pub/Sub)
Для ожидания события освобождения лока:
from typing import Optional
class WaitingLock:
def __init__(self, redis_client, key: str, ttl: int = 30, timeout: int = 5):
self.redis = redis_client
self.key = key
self.ttl = ttl
self.timeout = timeout
def acquire_with_wait(self, max_attempts: int = 3) -> str | None:
"""Получить лок с попытками ожидания"""
lock_id = str(uuid.uuid4())
for attempt in range(max_attempts):
# Попытка захватить лок
if self.redis.set(self.key, lock_id, nx=True, ex=self.ttl):
return lock_id
# Если не получилось, ждём события освобождения
pubsub = self.redis.pubsub()
pubsub.subscribe(f"{self.key}:released")
try:
message = pubsub.get_message(timeout=self.timeout)
if message and message["type"] == "message":
continue # Попробуем ещё раз
finally:
pubsub.unsubscribe()
return None # Не удалось получить лок
def release(self, lock_id: str):
"""Освобождение лока с уведомлением"""
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
redis.call("del", KEYS[1])
redis.call("publish", KEYS[1] .. ":released", "1")
return 1
else
return 0
end
"""
self.redis.eval(script, 1, self.key, lock_id)
6. Проблемы и решения в реальных проектах
Проблема: Deadlock при исключениях
# ❌ Плохо — лок не освобождается при ошибке
lock_id = acquire_lock("resource")
perform_operation() # Если исключение — лок зависит
release_lock("resource", lock_id)
# ✅ Хорошо — гарантированное освобождение
try:
lock_id = acquire_lock("resource", ttl=30) # TTL спасает
perform_operation()
finally:
release_lock("resource", lock_id)
Проблема: Race condition между check и set
# ❌ Плохо — два потока могут одновременно захватить
if not redis.exists(key):
redis.set(key, lock_id)
# ✅ Хорошо — атомарная операция
redis.set(key, lock_id, nx=True, ex=30)
Проблема: Потеря данных при рестарте Redis
# Решение: использовать Redis Cluster или AOF (Append-Only File)
# redis.conf: appendonly yes
Итоги лучших практик
- Всегда используй TTL — защита от deadlock
- Атомарные операции — SET NX EX, Lua скрипты
- Проверка владельца — уникальный lock_id
- Context managers — гарантированное освобождение
- Для критичных систем — Redlock с несколькими узлами
- Мониторинг — отслеживание зависших локов
В своих проектах использовал Redis для синхронизации процессов обработки платежей, кэширования вычислений и рейт-лимитинга API. Это надёжное решение для современных микросервисных архитектур.