Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Защита данных в приложении
Данные — это главный актив любого приложения. Защита данных включает конфиденциальность, целостность и доступность (CIA triad). Вот мой комплексный подход:
1. Хранение паролей
❌ Плохо
# НЕ сохраняем пароли в открытом виде!
user.password = password # ОЧЕНЬ ПЛОХО!
✅ Правильно: bcrypt
import bcrypt
from passlib.context import CryptContext
# Хешируем пароль при регистрации
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto"
)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed: str) -> bool:
return pwd_context.verify(plain_password, hashed)
# Использование
user.hashed_password = hash_password("secretPassword123")
# При логине
if verify_password(entered_password, user.hashed_password):
print("Пароль верный")
Почему bcrypt?
- Медленно (защита от brute force)
- Адаптивно (можно увеличить сложность)
- С солью (защита от rainbow tables)
2. Шифрование чувствительных данных
Данные в базе данных
from cryptography.fernet import Fernet
import os
from datetime import datetime
# Генерируем и сохраняем ключ (в .env!)
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY")
cipher = Fernet(ENCRYPTION_KEY.encode())
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, nullable=False)
# Шифруем SSN (социальный номер)
_ssn_encrypted = Column(String, nullable=True)
@property
def ssn(self):
if self._ssn_encrypted:
return cipher.decrypt(self._ssn_encrypted).decode()
return None
@ssn.setter
def ssn(self, value):
if value:
self._ssn_encrypted = cipher.encrypt(value.encode())
# Использование
user = User(email="john@example.com")
user.ssn = "123-45-6789" # Автоматически шифруется
db.add(user)
db.commit()
print(user.ssn) # "123-45-6789" (расшифровывается)
API ключи и токены
import secrets
from datetime import datetime, timedelta
from jose import JWTError, jwt
SECRET_KEY = os.getenv("SECRET_KEY") # Никогда в коде!
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
"""Создаём JWT токен с экспирацией."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
SECRET_KEY,
algorithm=ALGORITHM
)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
return None
return user_id
except JWTError:
return None
# Использование
token = create_access_token(data={"sub": str(user_id)})
3. HTTPS и шифрование в пути
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import ssl
app = FastAPI()
# Force HTTPS в продакшене
class HTTPSRedirectMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http" and os.getenv("ENV") == "production":
# Редирект на HTTPS
url = f"https://{scope['server'][0]}{scope['path']}"
response = RedirectResponse(url=url, status_code=301)
await response(scope, receive, send)
else:
await self.app(scope, receive, send)
app.add_middleware(HTTPSRedirectMiddleware)
# Генерируем SSL сертификат
# openssl req -x509 -newkey rsa:4096 -nodes -out cert.pem -keyout key.pem -days 365
4. SQL Injection защита
❌ Плохо: конкатенация строк
# НИКОГДА ТАК НЕ ДЕЛАЙТЕ!
query = f"SELECT * FROM users WHERE id = {user_id}"
db.execute(query)
✅ Правильно: параметризованные запросы
from sqlalchemy import text
# С SQLAlchemy ORM
user = db.query(User).filter(User.id == user_id).first()
# С сырым SQL
query = text("SELECT * FROM users WHERE id = :id")
result = db.execute(query, {"id": user_id})
# С psycopg2 (PostgreSQL)
cursor.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,) # Параметры отдельно от SQL
)
5. XSS (Cross-Site Scripting) защита
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from html import escape
app = FastAPI()
@app.get("/user/{user_id}")
async def get_user(user_id: int):
user = db.get_user(user_id)
# ❌ Плохо: прямой вывод
# return {"message": f"Hello {user.name}"}
# ✅ Хорошо: экранируем
safe_name = escape(user.name)
return {"message": f"Hello {safe_name}"}
# Или в Jinja2 templates автоматически экранирует
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
# В template.html
# {{ user.name }} <!-- Автоматически экранируется -->
6. CORS (Cross-Origin) защита
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# ❌ Плохо: разрешить всем
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# )
# ✅ Правильно: только доверенные домены
allowed_origins = [
"https://example.com",
"https://app.example.com",
]
if os.getenv("ENV") == "development":
allowed_origins.append("http://localhost:3000")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["Content-Type"],
)
7. Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
@app.post("/login")
@limiter.limit("5/minute") # Максимум 5 попыток в минуту
async def login(credentials: LoginSchema, request: Request):
user = authenticate_user(credentials.email, credentials.password)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
return {"access_token": create_token(user.id)}
8. Управление секретами
.env файл (НИКОГДА не коммитим!)
# .env
DATABASE_URL=postgresql://user:pass@localhost/db
SECRET_KEY=your-super-secret-key-here
ENCRYPTION_KEY=Fernet-encryption-key
API_KEY=external-service-key
Загрузка в приложение
from dotenv import load_dotenv
import os
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise ValueError("SECRET_KEY not set in environment")
.gitignore
.env
.env.local
*.key
*.pem
secrets/
9. Логирование без утечек
import logging
logger = logging.getLogger(__name__)
def process_payment(card_number: str, amount: float):
# ❌ Плохо: логируем чувствительные данные
# logger.info(f"Processing payment: {card_number}, {amount}")
# ✅ Правильно: скрываем чувствительные данные
masked_card = f"****{card_number[-4:]}"
logger.info(f"Processing payment: {masked_card}, {amount}")
# Обработка платежа
try:
charge_card(card_number, amount)
except Exception as e:
# ❌ Не логируем ошибку с полными данными
# logger.error(f"Payment failed: {e}, card: {card_number}")
# ✅ Логируем только необходимое
logger.error(f"Payment failed: {str(e)[:50]}...")
raise
10. Аудит и мониторинг
from datetime import datetime
from sqlalchemy import Column, String, DateTime, Integer
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
action = Column(String, nullable=False)
resource = Column(String, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
ip_address = Column(String, nullable=False)
@staticmethod
def log_action(user_id: int, action: str, resource: str, ip: str):
log_entry = AuditLog(
user_id=user_id,
action=action,
resource=resource,
ip_address=ip
)
db.add(log_entry)
db.commit()
# Использование
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, current_user: User = Depends(get_current_user), request: Request):
AuditLog.log_action(
user_id=current_user.id,
action="DELETE_USER",
resource=f"users/{user_id}",
ip=request.client.host
)
# Удаляем пользователя
return {"message": "User deleted"}
11. Регулярные проверки безопасности
# OWASP Dependency Check
# dependency-check.sh
# Проверяет известные уязвимости в зависимостях
# Bandit — сканирование Python кода
bandit -r src/
# Safety — проверка зависимостей
safety check
# pip-audit — аудит пакетов
pip-audit
12. Принцип наименьших привилегий
class UserRole(str, Enum):
ADMIN = "admin"
MODERATOR = "moderator"
USER = "user"
def require_role(required_role: UserRole):
async def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role != required_role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return current_user
return role_checker
# Использование
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
admin_user: User = Depends(require_role(UserRole.ADMIN))
):
# Может выполнить только администратор
return {"message": "User deleted"}
Чеклист безопасности
- ✅ Пароли хешируются (bcrypt)
- ✅ Чувствительные данные шифруются
- ✅ Используется HTTPS везде
- ✅ SQL injection защита (параметризованные запросы)
- ✅ XSS защита (экранирование вывода)
- ✅ CORS настроен правильно
- ✅ Rate limiting на чувствительные endpoints
- ✅ .env файл в .gitignore
- ✅ Секреты не в коде
- ✅ Логирование без чувствительных данных
- ✅ Аудит логирование включено
- ✅ Зависимости проверяются на уязвимости
- ✅ Валидация входных данных
- ✅ HTTPS редирект в продакшене
Помните: безопасность — это не одноразовая задача, а непрерывный процесс. Регулярно обновляйте зависимости, проводите аудиты и учитесь новым угрозам.