← Назад к вопросам

Как бы реализовал систему с ограничением обращений к ресурсу через 5 сессий?

3.0 Senior🔥 101 комментариев
#Архитектура и паттерны#Безопасность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Система ограничения обращений к ресурсу через сессии

Это задача на реализацию rate limiting с состоянием, привязанным к сессиям пользователя. Вопрос проверяет понимание управления состоянием, синхронизации доступа и выбора хранилища.

Интерпретация задачи

"Через 5 сессий" может означать:

  1. После 5 сессий одного пользователя — ограничить доступ
  2. Максимум 5 активных сессий — одновременные сессии
  3. За скользящее окно из 5 сессий — count sessions in time window

Реализую максимум 5 активных сессий — это наиболее практичный сценарий.

Вариант 1: In-Memory хранилище (разработка)

import threading
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, Set
import uuid

@dataclass
class Session:
    session_id: str
    user_id: str
    created_at: datetime
    last_accessed: datetime
    is_active: bool = True

class SessionLimiter:
    def __init__(self, max_sessions: int = 5, timeout_minutes: int = 30):
        self.max_sessions = max_sessions
        self.timeout = timedelta(minutes=timeout_minutes)
        self.sessions: Dict[str, Session] = {}
        self.user_sessions: Dict[str, Set[str]] = {}  # user_id -> set of session_ids
        self.lock = threading.RLock()
    
    def create_session(self, user_id: str) -> str:
        """Создать новую сессию, проверив лимит."""
        with self.lock:
            # Очистить истёкшие сессии
            self._cleanup_expired_sessions(user_id)
            
            # Получить активные сессии пользователя
            active_sessions = self._get_active_sessions(user_id)
            
            # Проверить лимит
            if len(active_sessions) >= self.max_sessions:
                # Вариант: выкинуть самую старую сессию
                oldest_session_id = min(
                    active_sessions,
                    key=lambda sid: self.sessions[sid].created_at
                )
                self._invalidate_session(oldest_session_id)
            
            # Создать новую сессию
            session_id = str(uuid.uuid4())
            now = datetime.now()
            session = Session(
                session_id=session_id,
                user_id=user_id,
                created_at=now,
                last_accessed=now
            )
            
            self.sessions[session_id] = session
            if user_id not in self.user_sessions:
                self.user_sessions[user_id] = set()
            self.user_sessions[user_id].add(session_id)
            
            return session_id
    
    def access_resource(self, session_id: str) -> bool:
        """Проверить, может ли сессия обратиться к ресурсу."""
        with self.lock:
            if session_id not in self.sessions:
                return False
            
            session = self.sessions[session_id]
            
            # Проверить, не истекла ли сессия
            if datetime.now() - session.last_accessed > self.timeout:
                self._invalidate_session(session_id)
                return False
            
            # Обновить время доступа
            session.last_accessed = datetime.now()
            return True
    
    def _get_active_sessions(self, user_id: str) -> Set[str]:
        """Получить активные сессии пользователя."""
        if user_id not in self.user_sessions:
            return set()
        
        active = set()
        for session_id in self.user_sessions[user_id]:
            if session_id in self.sessions and self.sessions[session_id].is_active:
                active.add(session_id)
        
        return active
    
    def _cleanup_expired_sessions(self, user_id: str) -> None:
        """Удалить истёкшие сессии."""
        if user_id not in self.user_sessions:
            return
        
        now = datetime.now()
        expired = [
            sid for sid in self.user_sessions[user_id]
            if sid in self.sessions and (now - self.sessions[sid].last_accessed) > self.timeout
        ]
        
        for session_id in expired:
            self._invalidate_session(session_id)
    
    def _invalidate_session(self, session_id: str) -> None:
        """Инвалидировать сессию."""
        if session_id in self.sessions:
            session = self.sessions[session_id]
            session.is_active = False
            self.user_sessions[session.user_id].discard(session_id)
            del self.sessions[session_id]

# Использование
limiter = SessionLimiter(max_sessions=5)

session_id = limiter.create_session(user_id='user_123')
if limiter.access_resource(session_id):
    print("Доступ разрешён")
else:
    print("Доступ запрещён")

Вариант 2: Redis (production)

Для масштабируемого приложения с несколькими воркерами:

import redis
from datetime import datetime, timedelta
import json
import uuid
from typing import Optional

class RedisSessionLimiter:
    def __init__(self, redis_url: str = 'redis://localhost:6379', 
                 max_sessions: int = 5, timeout_minutes: int = 30):
        self.redis = redis.from_url(redis_url)
        self.max_sessions = max_sessions
        self.timeout = timedelta(minutes=timeout_minutes).total_seconds()
    
    def create_session(self, user_id: str) -> str:
        """Создать сессию с автоматической очисткой старых."""
        session_id = str(uuid.uuid4())
        now = datetime.now().isoformat()
        
        # Ключ для хранения сессий пользователя
        sessions_key = f"user:{user_id}:sessions"
        session_key = f"session:{session_id}"
        
        # Используем Lua скрипт для атомарности
        lua_script = """
        local sessions_key = KEYS[1]
        local session_key = KEYS[2]
        local max_sessions = tonumber(ARGV[1])
        local session_id = ARGV[2]
        local now = ARGV[3]
        local timeout = tonumber(ARGV[4])
        
        -- Удалить истёкшие сессии
        local current_time = redis.call('TIME')[1]
        local sessions = redis.call('ZRANGE', sessions_key, 0, -1)
        for i, sid in ipairs(sessions) do
            local session_data = redis.call('GET', 'session:' .. sid)
            if session_data then
                local session = cjson.decode(session_data)
                if tonumber(current_time) - session.created_at > timeout * 2 then
                    redis.call('ZREM', sessions_key, sid)
                    redis.call('DEL', 'session:' .. sid)
                end
            end
        end
        
        -- Проверить лимит
        local count = redis.call('ZCARD', sessions_key)
        if count >= max_sessions then
            -- Удалить самую старую
            local oldest = redis.call('ZRANGE', sessions_key, 0, 0)[1]
            redis.call('ZREM', sessions_key, oldest)
            redis.call('DEL', 'session:' .. oldest)
        end
        
        -- Добавить новую сессию
        local session_data = cjson.encode({created_at = current_time, id = session_id})
        redis.call('SET', session_key, session_data, 'EX', timeout)
        redis.call('ZADD', sessions_key, current_time, session_id)
        
        return session_id
        """
        
        result = self.redis.eval(
            lua_script, 2,
            sessions_key, session_key,
            self.max_sessions, session_id, now, int(self.timeout)
        )
        
        return session_id
    
    def access_resource(self, session_id: str) -> bool:
        """Проверить доступ к ресурсу."""
        session_key = f"session:{session_id}"
        
        # Проверить существование сессии
        session_data = self.redis.get(session_key)
        
        if session_data is None:
            return False
        
        # Обновить TTL (touch)
        self.redis.expire(session_key, int(self.timeout))
        
        return True
    
    def close_session(self, session_id: str, user_id: str) -> None:
        """Явно закрыть сессию."""
        session_key = f"session:{session_id}"
        sessions_key = f"user:{user_id}:sessions"
        
        self.redis.delete(session_key)
        self.redis.zrem(sessions_key, session_id)

# Использование
limiter = RedisSessionLimiter()
session = limiter.create_session('user_123')
if limiter.access_resource(session):
    print("Доступ разрешён")

Вариант 3: FastAPI Middleware

Для веб-приложения:

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Optional
import time

app = FastAPI()
limiter = RedisSessionLimiter(max_sessions=5)

class SessionLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Получить session_id из cookie или заголовка
        session_id = request.cookies.get('session_id') or request.headers.get('X-Session-ID')
        
        if not session_id:
            return JSONResponse(
                {"error": "No session"},
                status_code=401
            )
        
        # Проверить доступ
        if not limiter.access_resource(session_id):
            return JSONResponse(
                {"error": "Session limit exceeded or expired"},
                status_code=429
            )
        
        response = await call_next(request)
        return response

app.add_middleware(SessionLimitMiddleware)

@app.post('/login')
async def login(username: str):
    session_id = limiter.create_session(username)
    response = JSONResponse({"session_id": session_id})
    response.set_cookie('session_id', session_id, max_age=1800, httponly=True)
    return response

@app.get('/protected-resource')
async def protected_resource():
    return {"data": "secret"}

Ключевые решения

АспектРешениеОбоснование
СостояниеRedis/в памятиДолжно быть доступно всем воркерам
АтомарностьLua скрипты (Redis)Избежать race conditions
TTLАвтоматический (Redis)Не требует фонового процесса
Старые сессииУдаление при созданииЭкономит память
Проверка лимитаПеред ресурсомFail-fast

Заключение

Для production я выбрал бы Redis с Lua скриптами потому что:

  • Атомарные операции без race conditions
  • Встроенный TTL и очистка
  • Масштабируется на множество воркеров
  • Простой интеграция с FastAPI/Django