← Назад к вопросам
Как реализовать транзакции вручную без встроенного механизма?
3.0 Senior🔥 81 комментариев
#Архитектура и паттерны#Базы данных (SQL)
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Реализация транзакций вручную без встроенного механизма
Это классический вопрос о ACID свойствах и том, как их обеспечить при работе с системами, где нет встроенных транзакций (например, распределённые системы, NoSQL базы данных или микросервисные архитектуры).
1. ACID свойства и вызовы
"""
ACID свойства:
Atomicity (Атомарность):
- Операция либо полностью выполняется, либо не выполняется вообще
- Проблема: как откатить частичные изменения?
Consistency (Консистентность):
- Данные остаются в согласованном состоянии
- Проблема: как проверить консистентность при распределённых операциях?
Isolation (Изоляция):
- Параллельные операции не влияют друг на друга
- Проблема: как избежать race conditions?
Durability (Долговечность):
- Данные сохраняются в случае сбоя
- Проблема: как гарантировать сохранение?
"""
2. Стратегия 1: Write-Ahead Logging (WAL)
Для обеспечения Atomicity и Durability используем журнал операций.
import json
from datetime import datetime
from enum import Enum
from pathlib import Path
class OperationStatus(Enum):
PENDING = "pending"
COMMITTED = "committed"
ROLLED_BACK = "rolled_back"
class WriteAheadLog:
"""Write-Ahead Logging для гарантии Atomicity"""
def __init__(self, log_file):
self.log_file = Path(log_file)
self.transactions = {}
def start_transaction(self, txn_id):
"""Начать новую транзакцию"""
entry = {
'id': txn_id,
'status': OperationStatus.PENDING.value,
'timestamp': datetime.utcnow().isoformat(),
'operations': []
}
self._write_to_log(entry)
self.transactions[txn_id] = entry
def log_operation(self, txn_id, operation):
"""Записать операцию в журнал"""
if txn_id not in self.transactions:
raise ValueError(f"Transaction {txn_id} not found")
self.transactions[txn_id]['operations'].append(operation)
self._write_to_log(self.transactions[txn_id])
def commit_transaction(self, txn_id):
"""Подтвердить транзакцию"""
if txn_id not in self.transactions:
raise ValueError(f"Transaction {txn_id} not found")
self.transactions[txn_id]['status'] = OperationStatus.COMMITTED.value
self._write_to_log(self.transactions[txn_id])
def rollback_transaction(self, txn_id):
"""Откатить транзакцию"""
if txn_id not in self.transactions:
raise ValueError(f"Transaction {txn_id} not found")
self.transactions[txn_id]['status'] = OperationStatus.ROLLED_BACK.value
self._write_to_log(self.transactions[txn_id])
def _write_to_log(self, entry):
"""Синхронно записать в журнал"""
with open(self.log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
f.flush() # Обязательный flush для гарантии durability
def recover(self):
"""Восстановление после сбоя"""
if not self.log_file.exists():
return
pending_txns = []
with open(self.log_file, 'r') as f:
for line in f:
entry = json.loads(line)
if entry['status'] == OperationStatus.PENDING.value:
pending_txns.append(entry['id'])
# Откатываем все незавершённые транзакции
for txn_id in pending_txns:
self.rollback_transaction(txn_id)
return pending_txns
3. Стратегия 2: Two-Phase Commit (2PC)
Для распределённых транзакций используем протокол двухфазного коммита.
from enum import Enum
from typing import Dict, List
import threading
class VoteResult(Enum):
YES = "yes"
NO = "no"
class TwoPhaseCommit:
"""Двухфазный коммит для распределённых транзакций"""
def __init__(self):
self.participants = {} # Название ресурса -> объект участника
self.lock = threading.RLock()
def register_participant(self, name, participant):
"""Зарегистрировать участника транзакции"""
self.participants[name] = participant
def execute(self, txn_id, operations: Dict[str, list]):
"""
Выполнить распределённую транзакцию
operations: {'db1': [op1, op2], 'db2': [op3, op4]}
"""
# ФАЗА 1: Prepare
votes = {}
with self.lock:
for participant_name, ops in operations.items():
if participant_name not in self.participants:
raise ValueError(f"Participant {participant_name} not found")
participant = self.participants[participant_name]
try:
# Попросить участника подготовить операции
participant.prepare(txn_id, ops)
votes[participant_name] = VoteResult.YES
except Exception as e:
print(f"Prepare failed for {participant_name}: {e}")
votes[participant_name] = VoteResult.NO
# Проверка: все ли согласились?
if all(v == VoteResult.YES for v in votes.values()):
# ФАЗА 2: Commit
return self._commit_phase(txn_id, operations)
else:
# ФАЗА 2: Abort
return self._abort_phase(txn_id, operations)
def _commit_phase(self, txn_id, operations):
"""Вторая фаза: коммит"""
results = {}
with self.lock:
for participant_name, ops in operations.items():
participant = self.participants[participant_name]
try:
participant.commit(txn_id, ops)
results[participant_name] = "committed"
except Exception as e:
# Критическая ошибка: операция не может быть откачена
print(f"Critical error in commit for {participant_name}: {e}")
results[participant_name] = "error"
return results
def _abort_phase(self, txn_id, operations):
"""Вторая фаза: откат"""
results = {}
with self.lock:
for participant_name, ops in operations.items():
participant = self.participants[participant_name]
try:
participant.abort(txn_id, ops)
results[participant_name] = "aborted"
except Exception as e:
print(f"Error in abort for {participant_name}: {e}")
results[participant_name] = "error"
return results
class DatabaseParticipant:
"""Участник в 2PC"""
def __init__(self, name):
self.name = name
self.prepared = {} # txn_id -> prepared_state
def prepare(self, txn_id, operations):
"""Подготовка (фаза 1)"""
# Проверить, можно ли выполнить операции
for op in operations:
if not self._can_execute(op):
raise Exception(f"Cannot execute {op}")
# Зарезервировать ресурсы, но не применять
self.prepared[txn_id] = operations
def commit(self, txn_id, operations):
"""Коммит (фаза 2)"""
if txn_id not in self.prepared:
raise Exception(f"Transaction {txn_id} not prepared")
# Применить операции
for op in operations:
self._execute(op)
del self.prepared[txn_id]
def abort(self, txn_id, operations):
"""Откат (фаза 2)"""
if txn_id in self.prepared:
del self.prepared[txn_id]
def _can_execute(self, operation):
# Проверка возможности выполнения
return True
def _execute(self, operation):
# Выполнение операции
print(f"{self.name}: executing {operation}")
# Использование
coordinator = TwoPhaseCommit()
coordinator.register_participant('db1', DatabaseParticipant('Database1'))
coordinator.register_participant('db2', DatabaseParticipant('Database2'))
operations = {
'db1': ['INSERT INTO users VALUES (1, "Alice")', 'UPDATE balance SET amount = 100'],
'db2': ['INSERT INTO audit VALUES (1, "transfer")']
}
result = coordinator.execute('txn_001', operations)
print(f"Transaction result: {result}")
4. Стратегия 3: Optimistic Locking (версионирование)
Для избежания блокировок используем версионирование.
import time
from typing import Any, Dict
class OptimisticLocking:
"""Оптимистичное блокирование через версионирование"""
def __init__(self):
self.data = {} # id -> {value, version}
self.lock = threading.RLock()
def write(self, key, value):
"""Создать новый элемент"""
with self.lock:
self.data[key] = {
'value': value,
'version': 1,
'timestamp': time.time()
}
def read(self, key):
"""Прочитать элемент"""
with self.lock:
if key not in self.data:
return None, None
entry = self.data[key]
return entry['value'], entry['version']
def update(self, key, new_value, expected_version):
"""Обновить элемент с проверкой версии"""
with self.lock:
if key not in self.data:
raise Exception(f"Key {key} not found")
entry = self.data[key]
# Проверка версии
if entry['version'] != expected_version:
raise Exception(
f"Version conflict: expected {expected_version}, got {entry['version']}"
)
# Обновить
entry['value'] = new_value
entry['version'] += 1
entry['timestamp'] = time.time()
return entry['version']
def transaction(self, key, transform_func):
"""Транзакция с автоматическим retry на конфликт версии"""
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
# Читаем текущее значение и версию
value, version = self.read(key)
# Трансформируем
new_value = transform_func(value)
# Пытаемся обновить
self.update(key, new_value, version)
return new_value
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise
time.sleep(0.1 * (2 ** retry_count)) # Exponential backoff
5. Стратегия 4: Event Sourcing
Вместо сохранения состояния сохраняем события.
from dataclasses import dataclass
from datetime import datetime
from typing import List
import json
@dataclass
class Event:
"""Событие в системе"""
event_type: str
aggregate_id: str
data: dict
timestamp: datetime
version: int
class EventStore:
"""Хранилище событий"""
def __init__(self, filename):
self.filename = filename
self.events = []
self.load()
def append_event(self, event: Event):
"""Добавить событие в журнал"""
self.events.append(event)
# Синхронно записать на диск
self._persist(event)
def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
"""Получить все события для сущности"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
def rebuild_state(self, aggregate_id: str):
"""Восстановить состояние из событий"""
events = self.get_events_for_aggregate(aggregate_id)
state = {'balance': 0, 'transfers': []}
for event in sorted(events, key=lambda e: e.version):
if event.event_type == 'money_transferred':
state['balance'] -= event.data['amount']
state['transfers'].append(event.data)
elif event.event_type == 'money_received':
state['balance'] += event.data['amount']
return state
def _persist(self, event: Event):
"""Записать событие на диск"""
with open(self.filename, 'a') as f:
event_dict = {
'event_type': event.event_type,
'aggregate_id': event.aggregate_id,
'data': event.data,
'timestamp': event.timestamp.isoformat(),
'version': event.version
}
f.write(json.dumps(event_dict) + '\n')
f.flush()
def load(self):
"""Загрузить события с диска"""
try:
with open(self.filename, 'r') as f:
for line in f:
event_dict = json.loads(line)
event = Event(
event_type=event_dict['event_type'],
aggregate_id=event_dict['aggregate_id'],
data=event_dict['data'],
timestamp=datetime.fromisoformat(event_dict['timestamp']),
version=event_dict['version']
)
self.events.append(event)
except FileNotFoundError:
pass
class BankAccount:
"""Агрегат для банковского счёта"""
def __init__(self, account_id: str, event_store: EventStore):
self.account_id = account_id
self.event_store = event_store
self.version = 0
self.balance = 0
self._load_from_events()
def transfer_money(self, amount: float):
"""Переводить деньги"""
if amount > self.balance:
raise Exception("Insufficient funds")
event = Event(
event_type='money_transferred',
aggregate_id=self.account_id,
data={'amount': amount},
timestamp=datetime.utcnow(),
version=self.version + 1
)
self.event_store.append_event(event)
self.balance -= amount
self.version += 1
def receive_money(self, amount: float):
"""Получить деньги"""
event = Event(
event_type='money_received',
aggregate_id=self.account_id,
data={'amount': amount},
timestamp=datetime.utcnow(),
version=self.version + 1
)
self.event_store.append_event(event)
self.balance += amount
self.version += 1
def _load_from_events(self):
"""Восстановить состояние из событий"""
state = self.event_store.rebuild_state(self.account_id)
self.balance = state['balance']
6. Сравнение подходов
"""
Основные подходы реализации транзакций вручную:
1. Write-Ahead Logging (WAL)
+ Простая реализация
+ Гарантирует Atomicity и Durability
- Не решает проблему распределённых транзакций
Использование: одна база, нужна гарантия отката
2. Two-Phase Commit (2PC)
+ Работает для распределённых систем
+ Гарантирует Consistency
- Медленнее из-за двух фаз
- Может привести к deadlock'ам
Использование: распределённые системы, критичен Consistency
3. Optimistic Locking
+ Высокая производительность при низком конфликте
+ Нет deadlock'ов
- Требует retry логику
- Может не гарантировать Consistency при высоком конфликте
Использование: высоконагруженные системы с низким конфликтом
4. Event Sourcing
+ Полная история изменений
+ Возможность восстановления в любую точку времени
+ Гарантирует Atomicity
- Сложнее в реализации
- Может привести к растущему размеру журнала
Использование: критичный аудит, восстановление состояния
"""
Заключение
Выбор стратегии зависит от требований:
- Одна база данных: WAL
- Распределённые системы: 2PC или Saga pattern
- Высоконагруженные системы: Optimistic Locking
- Нужен полный аудит: Event Sourcing
В реальных проектах часто комбинируют несколько подходов для достижения нужного баланса между консистентностью и производительностью.