Как БД будет обрабатывать одновременные запросы?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Как БД обрабатывает одновременные запросы
Одна из самых частых задач в разработке — обеспечить корректную работу приложения когда множество пользователей одновременно обращаются к БД. БД использует несколько механизмов для решения этой проблемы.
Проблема: Race Condition
Педставьте банковский счёт с балансом 100:
Время | Операция 1 | Операция 2 | Баланс
1 | READ (100) | - | 100
2 | - | READ (100) | 100
3 | WRITE (100-50) | - | 50
4 | - | WRITE (100-20) | 80 (?!)
Результат: потеряли 50 рублей! БД должна это предотвратить.
Механизм 1: Блокировки (Locks)
БД использует блокировки для предотвращения одновременного доступа.
Типы блокировок
# SHARED LOCK (Read Lock) — несколько читателей могут одновременно
# EXCLUSIVE LOCK (Write Lock) — только один писатель, никто не может читать
# PostgreSQL пример
from sqlalchemy import text
from sqlalchemy.orm import Session
session = Session()
# SELECT FOR UPDATE — эксклюзивная блокировка
# (блокирует другие процессы от доступа)
account = session.query(Account).with_for_update().filter(
Account.id == account_id
).first()
# Теперь только я имею доступ
account.balance -= 50
session.commit() # Блокировка снимается
# SELECT FOR UPDATE SKIP LOCKED — пропустить заблокированные
# (для очередей, чтобы не ждать)
available_task = session.query(Task).with_for_update(
skip_locked=True
).filter(Task.status == 'pending').first()
if available_task:
available_task.status = 'processing'
session.commit()
Проблема с блокировками: Deadlock
Процесс 1 | Процесс 2
|
LOCK Account A | LOCK Account B
(успех) | (успех)
|
LOCK Account B | LOCK Account A
(ждёт P2) | (ждёт P1)
ДЕДЛОК! Оба ждут друг друга.
Чтобы избежать deadlock:
# ✅ Всегда блокируй в одном порядке
if account1_id < account2_id:
acc1 = session.query(Account).with_for_update().filter(
Account.id == account1_id
).first()
acc2 = session.query(Account).with_for_update().filter(
Account.id == account2_id
).first()
else:
acc2 = session.query(Account).with_for_update().filter(
Account.id == account2_id
).first()
acc1 = session.query(Account).with_for_update().filter(
Account.id == account1_id
).first()
Механизм 2: MVCC (Multi-Version Concurrency Control)
Моказываются разные "версии" данных разным читателям, чтобы они не блокировали друг друга.
# PostgreSQL, InnoDB MySQL используют MVCC
# Временная шкала
# Transaction 1 | Transaction 2
# BEGIN |
# balance = 100 |
# (снимок T1) |
# | BEGIN
# | balance = 100 (снимок T2)
# UPDATE balance=50 |
# | SELECT balance
# | видит 100 (старую версию!)
# COMMIT |
#
# SELECT balance |
# видит 50 |
# | COMMIT
# Transaction 2 видела старую версию, потому что MVCC
Механизм 3: Уровни изоляции (Isolation Levels)
БД может работать с разной степенью строгости:
from sqlalchemy import create_engine
# 1. READ UNCOMMITTED (самый слабый)
# Транзакция может видеть незафиксированные изменения другой транзакции
engine = create_engine(
'postgresql://user:password@localhost/db',
isolation_level='READ_UNCOMMITTED'
)
# 2. READ COMMITTED (по умолчанию в PostgreSQL)
# Транзакция видит только зафиксированные данные
engine = create_engine(
'postgresql://user:password@localhost/db',
isolation_level='READ_COMMITTED' # Default
)
# 3. REPEATABLE READ (MySQL default, PostgreSQL)
# Видит снимок данных на момент начала транзакции
engine = create_engine(
'postgresql://user:password@localhost/db',
isolation_level='REPEATABLE_READ'
)
# 4. SERIALIZABLE (самый строгий)
# Полная изоляция, как если бы транзакции выполнялись по одной
engine = create_engine(
'postgresql://user:password@localhost/db',
isolation_level='SERIALIZABLE'
)
Практический пример: Трансфер денег
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy import text
Base = declarative_base()
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True)
owner = Column(String)
balance = Column(Float)
# ❌ НЕБЕЗОПАСНО (Race condition)
def unsafe_transfer(from_id, to_id, amount):
session = Session()
from_acc = session.query(Account).filter(Account.id == from_id).first()
to_acc = session.query(Account).filter(Account.id == to_id).first()
from_acc.balance -= amount # Другой процесс может менять в этот момент!
to_acc.balance += amount # И это!
session.commit()
# ✅ БЕЗОПАСНО (с блокировками)
def safe_transfer_with_locks(from_id, to_id, amount):
session = Session()
# Блокируем обе записи
from_acc = session.query(Account).with_for_update().filter(
Account.id == from_id
).first()
to_acc = session.query(Account).with_for_update().filter(
Account.id == to_id
).first()
if from_acc.balance < amount:
session.rollback()
raise ValueError("Insufficient funds")
from_acc.balance -= amount
to_acc.balance += amount
session.commit()
# ✅ БЕЗОПАСНО (в одной SQL транзакции)
def safe_transfer_atomic(from_id, to_id, amount):
session = Session()
# Выполнить всё в одной атомарной операции
session.execute(text("""
UPDATE accounts SET balance = balance - :amount
WHERE id = :from_id AND balance >= :amount
"""), {"amount": amount, "from_id": from_id})
session.execute(text("""
UPDATE accounts SET balance = balance + :amount
WHERE id = :to_id
"""), {"amount": amount, "to_id": to_id})
session.commit()
Оптимистичная блокировка (Optimistic Locking)
Вместо блокировки БД, проверяем версию записи:
from sqlalchemy import Column, Integer, String, Float, DateTime
from datetime import datetime
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True)
owner = Column(String)
balance = Column(Float)
version = Column(Integer, default=1) # Версия для оптимистичной блокировки
def optimistic_transfer(from_id, to_id, amount, from_version, to_version):
"""Перевод с проверкой версий"""
session = Session()
# Обновляем только если версия совпадает
from_acc = session.query(Account).filter(
Account.id == from_id,
Account.version == from_version # Проверка версии!
).with_for_update().first()
if not from_acc:
raise Exception("From account was modified")
to_acc = session.query(Account).filter(
Account.id == to_id,
Account.version == to_version
).with_for_update().first()
if not to_acc:
raise Exception("To account was modified")
from_acc.balance -= amount
from_acc.version += 1 # Увеличиваем версию
to_acc.balance += amount
to_acc.version += 1
try:
session.commit()
except:
session.rollback()
raise # Повторить транзакцию с новыми версиями
# Клиент отправляет версию:
# GET /account/1 -> { "balance": 100, "version": 5 }
# PUT /account/1 -> { "balance": 50, "version": 5 } # С версией!
Пулинг соединений (Connection Pooling)
БД не создаёт новое соединение для каждого запроса, а переиспользует:
from sqlalchemy import create_engine
# Без пулинга — каждый запрос создаёт новое соединение (медленно)
engine = create_engine(
'postgresql://user:password@localhost/db',
poolclass=NullPool # Нет пула
)
# С пулингом (по умолчанию)
engine = create_engine(
'postgresql://user:password@localhost/db',
pool_size=10, # До 10 соединений в пуле
max_overflow=20, # Еще 20 если нужно
pool_recycle=3600, # Пересоздавать соединение каждый час
pool_pre_ping=True, # Проверять соединение перед использованием
)
Уровень кэширования запросов
# В памяти приложения кэшировать результаты
from functools import lru_cache
import redis
redis_client = redis.Redis()
def get_user(user_id):
# Сначала проверить Redis
cached = redis_client.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Если нет, запросить из БД
user = session.query(User).filter(User.id == user_id).first()
# Сохранить в Redis на 1 час
redis_client.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
Практические рекомендации
# 1. Используйте ACID транзакции
try:
session.begin_nested() # Savepoint
# do work
session.commit()
except:
session.rollback()
# 2. Минимизируйте время блокировки
# ✅ Хорошо — быстро
session.query(Account).with_for_update().filter(...).first()
account.balance -= 50
session.commit()
# ❌ Плохо — медленно
session.query(Account).with_for_update().filter(...).first()
time.sleep(10) # Долгая операция вне БД
account.balance -= 50
session.commit()
# 3. Используйте индексы
# Запросы выполняются быстрее -> меньше блокировок
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String, index=True) # Индекс!
# 4. Батчируйте запросы
# Вместо 1000 INSERT, сделать 1 INSERT с 1000 строк
session.bulk_insert_mappings(User, [
{'name': 'Alice', 'age': 30},
{'name': 'Bob', 'age': 25},
# ...
])
# 5. Мониторьте медленные запросы
# EXPLAIN ANALYZE SELECT ...
Тестирование параллелизма
import threading
from concurrent.futures import ThreadPoolExecutor
def test_concurrent_transfer():
"""Тест параллельного трансфера"""
def transfer():
try:
safe_transfer_with_locks(from_id=1, to_id=2, amount=1)
except:
pass
# Запустить 100 потоков одновременно
with ThreadPoolExecutor(max_workers=50) as executor:
futures = [executor.submit(transfer) for _ in range(100)]
for future in futures:
future.result()
# Проверить корректность
from_acc = session.query(Account).filter(Account.id == 1).first()
to_acc = session.query(Account).filter(Account.id == 2).first()
# Сумма должна быть правильной
assert from_acc.balance + to_acc.balance == initial_sum
Итоги
БД обрабатывает одновременные запросы используя:
- Блокировки — предотвращают конфликты
- MVCC — разные версии для разных транзакций
- Уровни изоляции — баланс между производительностью и безопасностью
- Транзакции — атомарность операций
- Пулинг соединений — эффективное использование ресурсов
- Индексы — быстрые запросы = меньше блокировок
Как разработчик важно понимать эти механизмы, чтобы писать корректный и производительный код.