← Назад к вопросам

Как реализовать транзакции вручную без встроенного механизма?

3.0 Senior🔥 81 комментариев
#Архитектура и паттерны#Базы данных (SQL)

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Реализация транзакций вручную без встроенного механизма

Это классический вопрос о ACID свойствах и том, как их обеспечить при работе с системами, где нет встроенных транзакций (например, распределённые системы, NoSQL базы данных или микросервисные архитектуры).

1. ACID свойства и вызовы

"""
ACID свойства:

Atomicity (Атомарность):
- Операция либо полностью выполняется, либо не выполняется вообще
- Проблема: как откатить частичные изменения?

Consistency (Консистентность):
- Данные остаются в согласованном состоянии
- Проблема: как проверить консистентность при распределённых операциях?

Isolation (Изоляция):
- Параллельные операции не влияют друг на друга
- Проблема: как избежать race conditions?

Durability (Долговечность):
- Данные сохраняются в случае сбоя
- Проблема: как гарантировать сохранение?
"""

2. Стратегия 1: Write-Ahead Logging (WAL)

Для обеспечения Atomicity и Durability используем журнал операций.

import json
from datetime import datetime
from enum import Enum
from pathlib import Path

class OperationStatus(Enum):
    PENDING = "pending"
    COMMITTED = "committed"
    ROLLED_BACK = "rolled_back"

class WriteAheadLog:
    """Write-Ahead Logging для гарантии Atomicity"""
    
    def __init__(self, log_file):
        self.log_file = Path(log_file)
        self.transactions = {}
    
    def start_transaction(self, txn_id):
        """Начать новую транзакцию"""
        entry = {
            'id': txn_id,
            'status': OperationStatus.PENDING.value,
            'timestamp': datetime.utcnow().isoformat(),
            'operations': []
        }
        self._write_to_log(entry)
        self.transactions[txn_id] = entry
    
    def log_operation(self, txn_id, operation):
        """Записать операцию в журнал"""
        if txn_id not in self.transactions:
            raise ValueError(f"Transaction {txn_id} not found")
        
        self.transactions[txn_id]['operations'].append(operation)
        self._write_to_log(self.transactions[txn_id])
    
    def commit_transaction(self, txn_id):
        """Подтвердить транзакцию"""
        if txn_id not in self.transactions:
            raise ValueError(f"Transaction {txn_id} not found")
        
        self.transactions[txn_id]['status'] = OperationStatus.COMMITTED.value
        self._write_to_log(self.transactions[txn_id])
    
    def rollback_transaction(self, txn_id):
        """Откатить транзакцию"""
        if txn_id not in self.transactions:
            raise ValueError(f"Transaction {txn_id} not found")
        
        self.transactions[txn_id]['status'] = OperationStatus.ROLLED_BACK.value
        self._write_to_log(self.transactions[txn_id])
    
    def _write_to_log(self, entry):
        """Синхронно записать в журнал"""
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(entry) + '\n')
            f.flush()  # Обязательный flush для гарантии durability
    
    def recover(self):
        """Восстановление после сбоя"""
        if not self.log_file.exists():
            return
        
        pending_txns = []
        with open(self.log_file, 'r') as f:
            for line in f:
                entry = json.loads(line)
                if entry['status'] == OperationStatus.PENDING.value:
                    pending_txns.append(entry['id'])
        
        # Откатываем все незавершённые транзакции
        for txn_id in pending_txns:
            self.rollback_transaction(txn_id)
        
        return pending_txns

3. Стратегия 2: Two-Phase Commit (2PC)

Для распределённых транзакций используем протокол двухфазного коммита.

from enum import Enum
from typing import Dict, List
import threading

class VoteResult(Enum):
    YES = "yes"
    NO = "no"

class TwoPhaseCommit:
    """Двухфазный коммит для распределённых транзакций"""
    
    def __init__(self):
        self.participants = {}  # Название ресурса -> объект участника
        self.lock = threading.RLock()
    
    def register_participant(self, name, participant):
        """Зарегистрировать участника транзакции"""
        self.participants[name] = participant
    
    def execute(self, txn_id, operations: Dict[str, list]):
        """
        Выполнить распределённую транзакцию
        operations: {'db1': [op1, op2], 'db2': [op3, op4]}
        """
        # ФАЗА 1: Prepare
        votes = {}
        with self.lock:
            for participant_name, ops in operations.items():
                if participant_name not in self.participants:
                    raise ValueError(f"Participant {participant_name} not found")
                
                participant = self.participants[participant_name]
                
                try:
                    # Попросить участника подготовить операции
                    participant.prepare(txn_id, ops)
                    votes[participant_name] = VoteResult.YES
                except Exception as e:
                    print(f"Prepare failed for {participant_name}: {e}")
                    votes[participant_name] = VoteResult.NO
        
        # Проверка: все ли согласились?
        if all(v == VoteResult.YES for v in votes.values()):
            # ФАЗА 2: Commit
            return self._commit_phase(txn_id, operations)
        else:
            # ФАЗА 2: Abort
            return self._abort_phase(txn_id, operations)
    
    def _commit_phase(self, txn_id, operations):
        """Вторая фаза: коммит"""
        results = {}
        with self.lock:
            for participant_name, ops in operations.items():
                participant = self.participants[participant_name]
                try:
                    participant.commit(txn_id, ops)
                    results[participant_name] = "committed"
                except Exception as e:
                    # Критическая ошибка: операция не может быть откачена
                    print(f"Critical error in commit for {participant_name}: {e}")
                    results[participant_name] = "error"
        
        return results
    
    def _abort_phase(self, txn_id, operations):
        """Вторая фаза: откат"""
        results = {}
        with self.lock:
            for participant_name, ops in operations.items():
                participant = self.participants[participant_name]
                try:
                    participant.abort(txn_id, ops)
                    results[participant_name] = "aborted"
                except Exception as e:
                    print(f"Error in abort for {participant_name}: {e}")
                    results[participant_name] = "error"
        
        return results

class DatabaseParticipant:
    """Участник в 2PC"""
    
    def __init__(self, name):
        self.name = name
        self.prepared = {}  # txn_id -> prepared_state
    
    def prepare(self, txn_id, operations):
        """Подготовка (фаза 1)"""
        # Проверить, можно ли выполнить операции
        for op in operations:
            if not self._can_execute(op):
                raise Exception(f"Cannot execute {op}")
        
        # Зарезервировать ресурсы, но не применять
        self.prepared[txn_id] = operations
    
    def commit(self, txn_id, operations):
        """Коммит (фаза 2)"""
        if txn_id not in self.prepared:
            raise Exception(f"Transaction {txn_id} not prepared")
        
        # Применить операции
        for op in operations:
            self._execute(op)
        
        del self.prepared[txn_id]
    
    def abort(self, txn_id, operations):
        """Откат (фаза 2)"""
        if txn_id in self.prepared:
            del self.prepared[txn_id]
    
    def _can_execute(self, operation):
        # Проверка возможности выполнения
        return True
    
    def _execute(self, operation):
        # Выполнение операции
        print(f"{self.name}: executing {operation}")

# Использование
coordinator = TwoPhaseCommit()
coordinator.register_participant('db1', DatabaseParticipant('Database1'))
coordinator.register_participant('db2', DatabaseParticipant('Database2'))

operations = {
    'db1': ['INSERT INTO users VALUES (1, "Alice")', 'UPDATE balance SET amount = 100'],
    'db2': ['INSERT INTO audit VALUES (1, "transfer")']
}

result = coordinator.execute('txn_001', operations)
print(f"Transaction result: {result}")

4. Стратегия 3: Optimistic Locking (версионирование)

Для избежания блокировок используем версионирование.

import time
from typing import Any, Dict

class OptimisticLocking:
    """Оптимистичное блокирование через версионирование"""
    
    def __init__(self):
        self.data = {}  # id -> {value, version}
        self.lock = threading.RLock()
    
    def write(self, key, value):
        """Создать новый элемент"""
        with self.lock:
            self.data[key] = {
                'value': value,
                'version': 1,
                'timestamp': time.time()
            }
    
    def read(self, key):
        """Прочитать элемент"""
        with self.lock:
            if key not in self.data:
                return None, None
            entry = self.data[key]
            return entry['value'], entry['version']
    
    def update(self, key, new_value, expected_version):
        """Обновить элемент с проверкой версии"""
        with self.lock:
            if key not in self.data:
                raise Exception(f"Key {key} not found")
            
            entry = self.data[key]
            
            # Проверка версии
            if entry['version'] != expected_version:
                raise Exception(
                    f"Version conflict: expected {expected_version}, got {entry['version']}"
                )
            
            # Обновить
            entry['value'] = new_value
            entry['version'] += 1
            entry['timestamp'] = time.time()
            
            return entry['version']
    
    def transaction(self, key, transform_func):
        """Транзакция с автоматическим retry на конфликт версии"""
        max_retries = 5
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                # Читаем текущее значение и версию
                value, version = self.read(key)
                
                # Трансформируем
                new_value = transform_func(value)
                
                # Пытаемся обновить
                self.update(key, new_value, version)
                return new_value
            
            except Exception as e:
                retry_count += 1
                if retry_count >= max_retries:
                    raise
                time.sleep(0.1 * (2 ** retry_count))  # Exponential backoff

5. Стратегия 4: Event Sourcing

Вместо сохранения состояния сохраняем события.

from dataclasses import dataclass
from datetime import datetime
from typing import List
import json

@dataclass
class Event:
    """Событие в системе"""
    event_type: str
    aggregate_id: str
    data: dict
    timestamp: datetime
    version: int

class EventStore:
    """Хранилище событий"""
    
    def __init__(self, filename):
        self.filename = filename
        self.events = []
        self.load()
    
    def append_event(self, event: Event):
        """Добавить событие в журнал"""
        self.events.append(event)
        # Синхронно записать на диск
        self._persist(event)
    
    def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]:
        """Получить все события для сущности"""
        return [e for e in self.events if e.aggregate_id == aggregate_id]
    
    def rebuild_state(self, aggregate_id: str):
        """Восстановить состояние из событий"""
        events = self.get_events_for_aggregate(aggregate_id)
        state = {'balance': 0, 'transfers': []}
        
        for event in sorted(events, key=lambda e: e.version):
            if event.event_type == 'money_transferred':
                state['balance'] -= event.data['amount']
                state['transfers'].append(event.data)
            elif event.event_type == 'money_received':
                state['balance'] += event.data['amount']
        
        return state
    
    def _persist(self, event: Event):
        """Записать событие на диск"""
        with open(self.filename, 'a') as f:
            event_dict = {
                'event_type': event.event_type,
                'aggregate_id': event.aggregate_id,
                'data': event.data,
                'timestamp': event.timestamp.isoformat(),
                'version': event.version
            }
            f.write(json.dumps(event_dict) + '\n')
            f.flush()
    
    def load(self):
        """Загрузить события с диска"""
        try:
            with open(self.filename, 'r') as f:
                for line in f:
                    event_dict = json.loads(line)
                    event = Event(
                        event_type=event_dict['event_type'],
                        aggregate_id=event_dict['aggregate_id'],
                        data=event_dict['data'],
                        timestamp=datetime.fromisoformat(event_dict['timestamp']),
                        version=event_dict['version']
                    )
                    self.events.append(event)
        except FileNotFoundError:
            pass

class BankAccount:
    """Агрегат для банковского счёта"""
    
    def __init__(self, account_id: str, event_store: EventStore):
        self.account_id = account_id
        self.event_store = event_store
        self.version = 0
        self.balance = 0
        self._load_from_events()
    
    def transfer_money(self, amount: float):
        """Переводить деньги"""
        if amount > self.balance:
            raise Exception("Insufficient funds")
        
        event = Event(
            event_type='money_transferred',
            aggregate_id=self.account_id,
            data={'amount': amount},
            timestamp=datetime.utcnow(),
            version=self.version + 1
        )
        
        self.event_store.append_event(event)
        self.balance -= amount
        self.version += 1
    
    def receive_money(self, amount: float):
        """Получить деньги"""
        event = Event(
            event_type='money_received',
            aggregate_id=self.account_id,
            data={'amount': amount},
            timestamp=datetime.utcnow(),
            version=self.version + 1
        )
        
        self.event_store.append_event(event)
        self.balance += amount
        self.version += 1
    
    def _load_from_events(self):
        """Восстановить состояние из событий"""
        state = self.event_store.rebuild_state(self.account_id)
        self.balance = state['balance']

6. Сравнение подходов

"""
Основные подходы реализации транзакций вручную:

1. Write-Ahead Logging (WAL)
   + Простая реализация
   + Гарантирует Atomicity и Durability
   - Не решает проблему распределённых транзакций
   Использование: одна база, нужна гарантия отката

2. Two-Phase Commit (2PC)
   + Работает для распределённых систем
   + Гарантирует Consistency
   - Медленнее из-за двух фаз
   - Может привести к deadlock'ам
   Использование: распределённые системы, критичен Consistency

3. Optimistic Locking
   + Высокая производительность при низком конфликте
   + Нет deadlock'ов
   - Требует retry логику
   - Может не гарантировать Consistency при высоком конфликте
   Использование: высоконагруженные системы с низким конфликтом

4. Event Sourcing
   + Полная история изменений
   + Возможность восстановления в любую точку времени
   + Гарантирует Atomicity
   - Сложнее в реализации
   - Может привести к растущему размеру журнала
   Использование: критичный аудит, восстановление состояния
"""

Заключение

Выбор стратегии зависит от требований:

  • Одна база данных: WAL
  • Распределённые системы: 2PC или Saga pattern
  • Высоконагруженные системы: Optimistic Locking
  • Нужен полный аудит: Event Sourcing

В реальных проектах часто комбинируют несколько подходов для достижения нужного баланса между консистентностью и производительностью.