← Назад к вопросам
Как бы реализовал систему с ограничением обращений к ресурсу через 5 сессий?
3.0 Senior🔥 101 комментариев
#Архитектура и паттерны#Безопасность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Система ограничения обращений к ресурсу через сессии
Это задача на реализацию rate limiting с состоянием, привязанным к сессиям пользователя. Вопрос проверяет понимание управления состоянием, синхронизации доступа и выбора хранилища.
Интерпретация задачи
"Через 5 сессий" может означать:
- После 5 сессий одного пользователя — ограничить доступ
- Максимум 5 активных сессий — одновременные сессии
- За скользящее окно из 5 сессий — count sessions in time window
Реализую максимум 5 активных сессий — это наиболее практичный сценарий.
Вариант 1: In-Memory хранилище (разработка)
import threading
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, Set
import uuid
@dataclass
class Session:
session_id: str
user_id: str
created_at: datetime
last_accessed: datetime
is_active: bool = True
class SessionLimiter:
def __init__(self, max_sessions: int = 5, timeout_minutes: int = 30):
self.max_sessions = max_sessions
self.timeout = timedelta(minutes=timeout_minutes)
self.sessions: Dict[str, Session] = {}
self.user_sessions: Dict[str, Set[str]] = {} # user_id -> set of session_ids
self.lock = threading.RLock()
def create_session(self, user_id: str) -> str:
"""Создать новую сессию, проверив лимит."""
with self.lock:
# Очистить истёкшие сессии
self._cleanup_expired_sessions(user_id)
# Получить активные сессии пользователя
active_sessions = self._get_active_sessions(user_id)
# Проверить лимит
if len(active_sessions) >= self.max_sessions:
# Вариант: выкинуть самую старую сессию
oldest_session_id = min(
active_sessions,
key=lambda sid: self.sessions[sid].created_at
)
self._invalidate_session(oldest_session_id)
# Создать новую сессию
session_id = str(uuid.uuid4())
now = datetime.now()
session = Session(
session_id=session_id,
user_id=user_id,
created_at=now,
last_accessed=now
)
self.sessions[session_id] = session
if user_id not in self.user_sessions:
self.user_sessions[user_id] = set()
self.user_sessions[user_id].add(session_id)
return session_id
def access_resource(self, session_id: str) -> bool:
"""Проверить, может ли сессия обратиться к ресурсу."""
with self.lock:
if session_id not in self.sessions:
return False
session = self.sessions[session_id]
# Проверить, не истекла ли сессия
if datetime.now() - session.last_accessed > self.timeout:
self._invalidate_session(session_id)
return False
# Обновить время доступа
session.last_accessed = datetime.now()
return True
def _get_active_sessions(self, user_id: str) -> Set[str]:
"""Получить активные сессии пользователя."""
if user_id not in self.user_sessions:
return set()
active = set()
for session_id in self.user_sessions[user_id]:
if session_id in self.sessions and self.sessions[session_id].is_active:
active.add(session_id)
return active
def _cleanup_expired_sessions(self, user_id: str) -> None:
"""Удалить истёкшие сессии."""
if user_id not in self.user_sessions:
return
now = datetime.now()
expired = [
sid for sid in self.user_sessions[user_id]
if sid in self.sessions and (now - self.sessions[sid].last_accessed) > self.timeout
]
for session_id in expired:
self._invalidate_session(session_id)
def _invalidate_session(self, session_id: str) -> None:
"""Инвалидировать сессию."""
if session_id in self.sessions:
session = self.sessions[session_id]
session.is_active = False
self.user_sessions[session.user_id].discard(session_id)
del self.sessions[session_id]
# Использование
limiter = SessionLimiter(max_sessions=5)
session_id = limiter.create_session(user_id='user_123')
if limiter.access_resource(session_id):
print("Доступ разрешён")
else:
print("Доступ запрещён")
Вариант 2: Redis (production)
Для масштабируемого приложения с несколькими воркерами:
import redis
from datetime import datetime, timedelta
import json
import uuid
from typing import Optional
class RedisSessionLimiter:
def __init__(self, redis_url: str = 'redis://localhost:6379',
max_sessions: int = 5, timeout_minutes: int = 30):
self.redis = redis.from_url(redis_url)
self.max_sessions = max_sessions
self.timeout = timedelta(minutes=timeout_minutes).total_seconds()
def create_session(self, user_id: str) -> str:
"""Создать сессию с автоматической очисткой старых."""
session_id = str(uuid.uuid4())
now = datetime.now().isoformat()
# Ключ для хранения сессий пользователя
sessions_key = f"user:{user_id}:sessions"
session_key = f"session:{session_id}"
# Используем Lua скрипт для атомарности
lua_script = """
local sessions_key = KEYS[1]
local session_key = KEYS[2]
local max_sessions = tonumber(ARGV[1])
local session_id = ARGV[2]
local now = ARGV[3]
local timeout = tonumber(ARGV[4])
-- Удалить истёкшие сессии
local current_time = redis.call('TIME')[1]
local sessions = redis.call('ZRANGE', sessions_key, 0, -1)
for i, sid in ipairs(sessions) do
local session_data = redis.call('GET', 'session:' .. sid)
if session_data then
local session = cjson.decode(session_data)
if tonumber(current_time) - session.created_at > timeout * 2 then
redis.call('ZREM', sessions_key, sid)
redis.call('DEL', 'session:' .. sid)
end
end
end
-- Проверить лимит
local count = redis.call('ZCARD', sessions_key)
if count >= max_sessions then
-- Удалить самую старую
local oldest = redis.call('ZRANGE', sessions_key, 0, 0)[1]
redis.call('ZREM', sessions_key, oldest)
redis.call('DEL', 'session:' .. oldest)
end
-- Добавить новую сессию
local session_data = cjson.encode({created_at = current_time, id = session_id})
redis.call('SET', session_key, session_data, 'EX', timeout)
redis.call('ZADD', sessions_key, current_time, session_id)
return session_id
"""
result = self.redis.eval(
lua_script, 2,
sessions_key, session_key,
self.max_sessions, session_id, now, int(self.timeout)
)
return session_id
def access_resource(self, session_id: str) -> bool:
"""Проверить доступ к ресурсу."""
session_key = f"session:{session_id}"
# Проверить существование сессии
session_data = self.redis.get(session_key)
if session_data is None:
return False
# Обновить TTL (touch)
self.redis.expire(session_key, int(self.timeout))
return True
def close_session(self, session_id: str, user_id: str) -> None:
"""Явно закрыть сессию."""
session_key = f"session:{session_id}"
sessions_key = f"user:{user_id}:sessions"
self.redis.delete(session_key)
self.redis.zrem(sessions_key, session_id)
# Использование
limiter = RedisSessionLimiter()
session = limiter.create_session('user_123')
if limiter.access_resource(session):
print("Доступ разрешён")
Вариант 3: FastAPI Middleware
Для веб-приложения:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Optional
import time
app = FastAPI()
limiter = RedisSessionLimiter(max_sessions=5)
class SessionLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Получить session_id из cookie или заголовка
session_id = request.cookies.get('session_id') or request.headers.get('X-Session-ID')
if not session_id:
return JSONResponse(
{"error": "No session"},
status_code=401
)
# Проверить доступ
if not limiter.access_resource(session_id):
return JSONResponse(
{"error": "Session limit exceeded or expired"},
status_code=429
)
response = await call_next(request)
return response
app.add_middleware(SessionLimitMiddleware)
@app.post('/login')
async def login(username: str):
session_id = limiter.create_session(username)
response = JSONResponse({"session_id": session_id})
response.set_cookie('session_id', session_id, max_age=1800, httponly=True)
return response
@app.get('/protected-resource')
async def protected_resource():
return {"data": "secret"}
Ключевые решения
| Аспект | Решение | Обоснование |
|---|---|---|
| Состояние | Redis/в памяти | Должно быть доступно всем воркерам |
| Атомарность | Lua скрипты (Redis) | Избежать race conditions |
| TTL | Автоматический (Redis) | Не требует фонового процесса |
| Старые сессии | Удаление при создании | Экономит память |
| Проверка лимита | Перед ресурсом | Fail-fast |
Заключение
Для production я выбрал бы Redis с Lua скриптами потому что:
- Атомарные операции без race conditions
- Встроенный TTL и очистка
- Масштабируется на множество воркеров
- Простой интеграция с FastAPI/Django