← Назад к вопросам

Зачем нужно синхронизировать потоки, если есть изоляция транзакций в БД?

2.0 Middle🔥 91 комментариев
#Асинхронность и многопоточность#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Зачем синхронизировать потоки, если есть изоляция в БД

Это популярный вопрос с хитрым ответом. Изоляция транзакций в БД НЕ решает всех проблем конкурентности, и синхронизация остаётся необходимой в памяти приложения.

Основная путаница

# Ошибочное предположение:
# "Если БД гарантирует ACID, то потокам не нужна синхронизация"

# Реальность:
# Транзакции БД защищают ТОЛЬКО операции в БД
# В памяти приложения гонка может произойти ДО запроса к БД

Пример проблемы в памяти

import threading
from sqlalchemy.orm import Session
from database import engine, User

balance = 1000  # Переменная в памяти приложения

def withdraw(amount):
    global balance
    
    # ❌ RACE CONDITION В ПАМЯТИ
    temp = balance  # Поток 1: прочитал 1000
                     # Поток 2: прочитал 1000 (ещё не обновлено!)
    
    temp -= amount   # Поток 1: вычислил 1000 - 500 = 500
                     # Поток 2: вычислил 1000 - 300 = 700
    
    balance = temp   # Поток 1: записал 500
                     # Поток 2: записал 700 (перезаписал!)
    
    # Результат: потеряна одна операция!

# Даже если БД работает идеально, код падает
thread1 = threading.Thread(target=withdraw, args=(500,))
thread2 = threading.Thread(target=withdraw, args=(300,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

print(balance)  # Может быть 700 вместо 200!

Транзакции в БД защищают ТОЛЬКО БД

# ✅ Если операция идёт в БД с транзакцией
with Session(engine) as session:
    user = session.query(User).filter_by(id=1).first()
    
    # Критическая секция в БД
    user.balance -= 500
    session.commit()  # ACID гарантирует консистентность БД

# ❌ Но операции в памяти не защищены
cache = {}  # Словарь в памяти

def update_cache(key, value):
    if key not in cache:  # Race condition здесь!
        cache[key] = value

Разные уровни изоляции БД

Изоляция в БД работает, но имеет пределы:

-- READ UNCOMMITTED
Транзакция 1: UPDATE users SET balance = 500 WHERE id = 1;
-- Транзакция 2 может прочитать 500 ДО COMMIT! (Dirty read)

-- READ COMMITTED (обычный уровень)
Транзакция 1: SELECT balance FROM users WHERE id = 1;  -- 1000
Транзакция 2: UPDATE users SET balance = 900 WHERE id = 1; COMMIT;
Транзакция 1: SELECT balance FROM users WHERE id = 1;  -- 900 (разные значения!)

-- REPEATABLE READ
Транзакция 1: SELECT * FROM users WHERE age > 18;
Транзакция 2: INSERT INTO users (age) VALUES (25); COMMIT;
Транзакция 1: SELECT * FROM users WHERE age > 18;  -- Может быть новая строка!

-- SERIALIZABLE
-- Самый строгий уровень, но медленный
Транзакции выполняются последовательно

Типичные проблемы без синхронизации

1. Lost update (потерянное обновление)

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance  # В памяти приложения!
    
    def deposit(self, amount):
        self.balance += amount
    
    def withdraw(self, amount):
        self.balance -= amount

account = BankAccount(1000)

def operation1():
    account.withdraw(300)

def operation2():
    account.deposit(200)

# Без синхронизации
t1 = threading.Thread(target=operation1)
t2 = threading.Thread(target=operation2)
t1.start()
t2.start()
t1.join()
t2.join()

print(account.balance)  # Непредсказуемо!

2. Dirty read в памяти

class Order:
    def __init__(self):
        self.status = 'pending'
        self.items = []
    
    def add_item(self, item):
        self.items.append(item)  # ❌ Не потокобезопасно
    
    def finalize(self):
        if len(self.items) > 0:  # Race condition!
            self.status = 'confirmed'

3. Проблема с кешом

from functools import lru_cache

cache = {}

def get_user(user_id):
    if user_id not in cache:  # Race: могут войти оба потока
        user = fetch_from_db(user_id)  # Дорогой запрос дважды!
        cache[user_id] = user
    return cache[user_id]

Синхронизация в памяти приложения

1. Lock (мьютекс)

import threading

balance = 1000
lock = threading.Lock()

def withdraw(amount):
    global balance
    
    with lock:  # ✅ Только один поток в этом месте
        temp = balance
        temp -= amount
        balance = temp
    # lock автоматически освобождается

thread1 = threading.Thread(target=withdraw, args=(500,))
thread2 = threading.Thread(target=withdraw, args=(300,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

print(balance)  # Всегда 200

2. RLock (переиспользуемый lock)

lock = threading.RLock()  # Один поток может захватить дважды

with lock:
    with lock:  # Это работает (обычный Lock упадёт)
        pass

3. Semaphore (ограничение доступа)

sem = threading.Semaphore(3)  # Максимум 3 потока одновременно

with sem:
    # В этом блоке максимум 3 потока
    expensive_operation()

4. Condition (ожидание условия)

from threading import Condition

cond = Condition()
data = []

def producer():
    with cond:
        data.append('item')
        cond.notify_all()  # Пробудить ждущие потоки

def consumer():
    with cond:
        while not data:  # Ждём данные
            cond.wait()
        item = data.pop(0)

Практический пример: счётчик с БД

# ❌ БЕЗ синхронизации (даже с ACID БД)
class Counter:
    def __init__(self, db, counter_id):
        self.db = db
        self.counter_id = counter_id
        self.local_value = 0  # Кеш в памяти!
    
    def increment(self):
        with self.db.session() as session:
            counter = session.query(Counter).get(self.counter_id)
            counter.value += 1  # Transaction 1
            session.commit()
        
        self.local_value += 1  # Race condition в памяти!
    
    def get_value(self):
        return self.local_value  # Может отличаться от БД

# ✅ С синхронизацией
import threading

class SynchronizedCounter:
    def __init__(self, db, counter_id):
        self.db = db
        self.counter_id = counter_id
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # Только один поток
            with self.db.session() as session:
                counter = session.query(Counter).get(self.counter_id)
                counter.value += 1
                session.commit()

ACID не решает гонку в памяти

# ACID = Atomicity, Consistency, Isolation, Durability
# Это гарантии ДЛЯ БД, не для памяти приложения

# Atomicity: Транзакция либо полностью выполнена, либо нет
# Не помогает с гонкой в памяти

# Consistency: БД в консистентном состоянии
# Но состояние в памяти может быть не консистентно

# Isolation: Транзакции не видят друг друга
# Только в БД, в памяти приложения нет

# Durability: После commit данные сохранены
# Не защищает данные в памяти

Вывод

# Синхронизация нужна потому что:

# 1. ACID в БД защищает ТОЛЬКО БД
with db.transaction():  # ✅ Защищено
    db.update(...)

# 2. В памяти приложения нет ACID
cache[key] = value  # ❌ Не защищено

# 3. Гонка может произойти ДО/ПОСЛЕ БД операции
if user:  # Race condition
    update_cache()

# 4. Потоки могут видеть неконсистентное состояние
value1 = obj.attr1  # Поток 1
value2 = obj.attr2  # Может быть обновлено потоком 2!

Вывод: изоляция транзакций в БД — необходимый, но недостаточный инструмент. Для полной защиты от гонки нужна синхронизация на уровне приложения (locks, atomics, thread-safe структуры). БД защищает только БД, приложение должно защищать себя.