← Назад к вопросам

Как БД будет обрабатывать одновременные запросы?

2.0 Middle🔥 221 комментариев
#Архитектура и паттерны#Асинхронность и многопоточность#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как БД обрабатывает одновременные запросы

Одна из самых частых задач в разработке — обеспечить корректную работу приложения когда множество пользователей одновременно обращаются к БД. БД использует несколько механизмов для решения этой проблемы.

Проблема: Race Condition

Педставьте банковский счёт с балансом 100:

Время | Операция 1      | Операция 2      | Баланс
  1   | READ (100)      | -               | 100
  2   | -               | READ (100)      | 100
  3   | WRITE (100-50)  | -               | 50
  4   | -               | WRITE (100-20)  | 80 (?!)

Результат: потеряли 50 рублей! БД должна это предотвратить.

Механизм 1: Блокировки (Locks)

БД использует блокировки для предотвращения одновременного доступа.

Типы блокировок

# SHARED LOCK (Read Lock) — несколько читателей могут одновременно
# EXCLUSIVE LOCK (Write Lock) — только один писатель, никто не может читать

# PostgreSQL пример
from sqlalchemy import text
from sqlalchemy.orm import Session

session = Session()

# SELECT FOR UPDATE — эксклюзивная блокировка
# (блокирует другие процессы от доступа)
account = session.query(Account).with_for_update().filter(
    Account.id == account_id
).first()

# Теперь только я имею доступ
account.balance -= 50
session.commit()  # Блокировка снимается

# SELECT FOR UPDATE SKIP LOCKED — пропустить заблокированные
# (для очередей, чтобы не ждать)
available_task = session.query(Task).with_for_update(
    skip_locked=True
).filter(Task.status == 'pending').first()

if available_task:
    available_task.status = 'processing'
    session.commit()

Проблема с блокировками: Deadlock

Процесс 1              | Процесс 2
                       |
LOCK Account A         | LOCK Account B
(успех)                | (успех)
                       |
LOCK Account B         | LOCK Account A
(ждёт P2)              | (ждёт P1)

ДЕДЛОК! Оба ждут друг друга.

Чтобы избежать deadlock:

# ✅ Всегда блокируй в одном порядке
if account1_id < account2_id:
    acc1 = session.query(Account).with_for_update().filter(
        Account.id == account1_id
    ).first()
    acc2 = session.query(Account).with_for_update().filter(
        Account.id == account2_id
    ).first()
else:
    acc2 = session.query(Account).with_for_update().filter(
        Account.id == account2_id
    ).first()
    acc1 = session.query(Account).with_for_update().filter(
        Account.id == account1_id
    ).first()

Механизм 2: MVCC (Multi-Version Concurrency Control)

Моказываются разные "версии" данных разным читателям, чтобы они не блокировали друг друга.

# PostgreSQL, InnoDB MySQL используют MVCC

# Временная шкала
# Transaction 1        | Transaction 2
# BEGIN                |
# balance = 100        |
# (снимок T1)         |
#                      | BEGIN
#                      | balance = 100 (снимок T2)
# UPDATE balance=50    |
#                      | SELECT balance
#                      | видит 100 (старую версию!)
# COMMIT               |
#
# SELECT balance       |
# видит 50             |
#                      | COMMIT

# Transaction 2 видела старую версию, потому что MVCC

Механизм 3: Уровни изоляции (Isolation Levels)

БД может работать с разной степенью строгости:

from sqlalchemy import create_engine

# 1. READ UNCOMMITTED (самый слабый)
# Транзакция может видеть незафиксированные изменения другой транзакции
engine = create_engine(
    'postgresql://user:password@localhost/db',
    isolation_level='READ_UNCOMMITTED'
)

# 2. READ COMMITTED (по умолчанию в PostgreSQL)
# Транзакция видит только зафиксированные данные
engine = create_engine(
    'postgresql://user:password@localhost/db',
    isolation_level='READ_COMMITTED'  # Default
)

# 3. REPEATABLE READ (MySQL default, PostgreSQL)
# Видит снимок данных на момент начала транзакции
engine = create_engine(
    'postgresql://user:password@localhost/db',
    isolation_level='REPEATABLE_READ'
)

# 4. SERIALIZABLE (самый строгий)
# Полная изоляция, как если бы транзакции выполнялись по одной
engine = create_engine(
    'postgresql://user:password@localhost/db',
    isolation_level='SERIALIZABLE'
)

Практический пример: Трансфер денег

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy import text

Base = declarative_base()

class Account(Base):
    __tablename__ = 'accounts'
    id = Column(Integer, primary_key=True)
    owner = Column(String)
    balance = Column(Float)

# ❌ НЕБЕЗОПАСНО (Race condition)
def unsafe_transfer(from_id, to_id, amount):
    session = Session()
    
    from_acc = session.query(Account).filter(Account.id == from_id).first()
    to_acc = session.query(Account).filter(Account.id == to_id).first()
    
    from_acc.balance -= amount  # Другой процесс может менять в этот момент!
    to_acc.balance += amount    # И это!
    
    session.commit()

# ✅ БЕЗОПАСНО (с блокировками)
def safe_transfer_with_locks(from_id, to_id, amount):
    session = Session()
    
    # Блокируем обе записи
    from_acc = session.query(Account).with_for_update().filter(
        Account.id == from_id
    ).first()
    to_acc = session.query(Account).with_for_update().filter(
        Account.id == to_id
    ).first()
    
    if from_acc.balance < amount:
        session.rollback()
        raise ValueError("Insufficient funds")
    
    from_acc.balance -= amount
    to_acc.balance += amount
    
    session.commit()

# ✅ БЕЗОПАСНО (в одной SQL транзакции)
def safe_transfer_atomic(from_id, to_id, amount):
    session = Session()
    
    # Выполнить всё в одной атомарной операции
    session.execute(text("""
        UPDATE accounts SET balance = balance - :amount
        WHERE id = :from_id AND balance >= :amount
    """), {"amount": amount, "from_id": from_id})
    
    session.execute(text("""
        UPDATE accounts SET balance = balance + :amount
        WHERE id = :to_id
    """), {"amount": amount, "to_id": to_id})
    
    session.commit()

Оптимистичная блокировка (Optimistic Locking)

Вместо блокировки БД, проверяем версию записи:

from sqlalchemy import Column, Integer, String, Float, DateTime
from datetime import datetime

class Account(Base):
    __tablename__ = 'accounts'
    id = Column(Integer, primary_key=True)
    owner = Column(String)
    balance = Column(Float)
    version = Column(Integer, default=1)  # Версия для оптимистичной блокировки

def optimistic_transfer(from_id, to_id, amount, from_version, to_version):
    """Перевод с проверкой версий"""
    session = Session()
    
    # Обновляем только если версия совпадает
    from_acc = session.query(Account).filter(
        Account.id == from_id,
        Account.version == from_version  # Проверка версии!
    ).with_for_update().first()
    
    if not from_acc:
        raise Exception("From account was modified")
    
    to_acc = session.query(Account).filter(
        Account.id == to_id,
        Account.version == to_version
    ).with_for_update().first()
    
    if not to_acc:
        raise Exception("To account was modified")
    
    from_acc.balance -= amount
    from_acc.version += 1  # Увеличиваем версию
    
    to_acc.balance += amount
    to_acc.version += 1
    
    try:
        session.commit()
    except:
        session.rollback()
        raise  # Повторить транзакцию с новыми версиями

# Клиент отправляет версию:
# GET /account/1 -> { "balance": 100, "version": 5 }
# PUT /account/1 -> { "balance": 50, "version": 5 }  # С версией!

Пулинг соединений (Connection Pooling)

БД не создаёт новое соединение для каждого запроса, а переиспользует:

from sqlalchemy import create_engine

# Без пулинга — каждый запрос создаёт новое соединение (медленно)
engine = create_engine(
    'postgresql://user:password@localhost/db',
    poolclass=NullPool  # Нет пула
)

# С пулингом (по умолчанию)
engine = create_engine(
    'postgresql://user:password@localhost/db',
    pool_size=10,           # До 10 соединений в пуле
    max_overflow=20,        # Еще 20 если нужно
    pool_recycle=3600,      # Пересоздавать соединение каждый час
    pool_pre_ping=True,     # Проверять соединение перед использованием
)

Уровень кэширования запросов

# В памяти приложения кэшировать результаты
from functools import lru_cache
import redis

redis_client = redis.Redis()

def get_user(user_id):
    # Сначала проверить Redis
    cached = redis_client.get(f"user:{user_id}")
    if cached:
        return json.loads(cached)
    
    # Если нет, запросить из БД
    user = session.query(User).filter(User.id == user_id).first()
    
    # Сохранить в Redis на 1 час
    redis_client.setex(f"user:{user_id}", 3600, json.dumps(user))
    
    return user

Практические рекомендации

# 1. Используйте ACID транзакции
try:
    session.begin_nested()  # Savepoint
    # do work
    session.commit()
except:
    session.rollback()

# 2. Минимизируйте время блокировки
# ✅ Хорошо — быстро
session.query(Account).with_for_update().filter(...).first()
account.balance -= 50
session.commit()

# ❌ Плохо — медленно
session.query(Account).with_for_update().filter(...).first()
time.sleep(10)  # Долгая операция вне БД
account.balance -= 50
session.commit()

# 3. Используйте индексы
# Запросы выполняются быстрее -> меньше блокировок
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    email = Column(String, index=True)  # Индекс!

# 4. Батчируйте запросы
# Вместо 1000 INSERT, сделать 1 INSERT с 1000 строк
session.bulk_insert_mappings(User, [
    {'name': 'Alice', 'age': 30},
    {'name': 'Bob', 'age': 25},
    # ...
])

# 5. Мониторьте медленные запросы
# EXPLAIN ANALYZE SELECT ...

Тестирование параллелизма

import threading
from concurrent.futures import ThreadPoolExecutor

def test_concurrent_transfer():
    """Тест параллельного трансфера"""
    
    def transfer():
        try:
            safe_transfer_with_locks(from_id=1, to_id=2, amount=1)
        except:
            pass
    
    # Запустить 100 потоков одновременно
    with ThreadPoolExecutor(max_workers=50) as executor:
        futures = [executor.submit(transfer) for _ in range(100)]
        for future in futures:
            future.result()
    
    # Проверить корректность
    from_acc = session.query(Account).filter(Account.id == 1).first()
    to_acc = session.query(Account).filter(Account.id == 2).first()
    
    # Сумма должна быть правильной
    assert from_acc.balance + to_acc.balance == initial_sum

Итоги

БД обрабатывает одновременные запросы используя:

  1. Блокировки — предотвращают конфликты
  2. MVCC — разные версии для разных транзакций
  3. Уровни изоляции — баланс между производительностью и безопасностью
  4. Транзакции — атомарность операций
  5. Пулинг соединений — эффективное использование ресурсов
  6. Индексы — быстрые запросы = меньше блокировок

Как разработчик важно понимать эти механизмы, чтобы писать корректный и производительный код.

Как БД будет обрабатывать одновременные запросы? | PrepBro