← Назад к вопросам

Как обеспечить целостность данных при одновременном начислении баллов на счет и их списании

2.7 Senior🔥 141 комментариев
#Базы данных и SQL#Многопоточность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI22 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Целостность данных при одновременных операциях со счётом

Проблема

Представь ситуацию: у пользователя на счету 100 баллов. Одновременно:

  • Один запрос: начислить 50 баллов (100 + 50 = 150)
  • Другой запрос: списать 30 баллов (100 - 30 = 70)

Результат без синхронизации может быть непредсказуемым. Оба запроса читают значение 100, и в зависимости от порядка выполнения итоговое значение будет либо 150, либо 70, хотя правильно должно быть 120.

Время  Поток 1           Поток 2           Баланс
---    -------           -------           ------
1                        READ balance      100
2      READ balance                        100
3                        balance = 100-30  70
4                        WRITE balance=70  70
5      balance = 100+50                    100
6      WRITE balance=150                   150 ← НЕПРАВИЛЬНО!

Решение 1: Пессимистическая блокировка (SELECT FOR UPDATE)

Фиксируем строку в БД, чтобы другие не могли её читать пока транзакция не завершится:

@Service
public class PointService {
    @Autowired
    private PointRepository pointRepository;
    
    @Transactional
    public void addPoints(Long userId, int amount) {
        // Блокируем строку
        User user = pointRepository.findByIdForUpdate(userId);
        user.setPoints(user.getPoints() + amount);
        pointRepository.save(user);
    }
    
    @Transactional
    public void deductPoints(Long userId, int amount) throws InsufficientPointsException {
        User user = pointRepository.findByIdForUpdate(userId);
        if (user.getPoints() < amount) {
            throw new InsufficientPointsException("Недостаточно баллов");
        }
        user.setPoints(user.getPoints() - amount);
        pointRepository.save(user);
    }
}

@Repository
public interface PointRepository extends JpaRepository<User, Long> {
    @Query("SELECT u FROM User u WHERE u.id = :id FOR UPDATE")
    @Lock(LockModeType.PESSIMISTIC_WRITE) // Явная блокировка
    User findByIdForUpdate(@Param("id") Long id);
}

Как это работает:

Время  Поток 1               Поток 2           Баланс
---    -------               -------           ------
1      BEGIN TRANSACTION
2      SELECT FOR UPDATE     (ждёт блокировку)
3      balance = 100+50=150              
4      WRITE balance=150                 150
5      COMMIT
6                            SELECT FOR UPDATE (получает блокировку)
7                            balance = 150-30=120
8                            WRITE balance=120  120 ← ПРАВИЛЬНО!
9                            COMMIT

Решение 2: Оптимистическая блокировка (Version)

Вместо физической блокировки добавляем версию (поле которое увеличивается при каждом изменении):

@Entity
public class User {
    @Id
    private Long id;
    
    private int points;
    
    @Version // Аннотация для оптимистической блокировки
    private Long version; // Увеличивается при каждом save
}

@Service
public class PointService {
    @Autowired
    private UserRepository userRepository;
    
    @Transactional
    public void addPoints(Long userId, int amount) {
        User user = userRepository.findById(userId).orElseThrow();
        user.setPoints(user.getPoints() + amount);
        try {
            userRepository.save(user); // Если version изменился - выбросит исключение
        } catch (OptimisticLockingFailureException e) {
            // Повторить попытку
            addPoints(userId, amount);
        }
    }
}

Как это работает:

Поток 1 читает: version=1, points=100
Поток 2 читает: version=1, points=100

Поток 1: points=150, version должен быть 1, БД имеет 1 → UPDATE OK, version=2
Поток 2: points=70,  version должен быть 1, БД имеет 2 → UPDATE FAILS (исключение)
Поток 2 повторяет: читает version=2, points=150, вычитает 30 → points=120, version=3

Решение 3: Атомарные операции на уровне БД

Лучший способ — дать БД делать всё в одном SQL запросе:

@Repository
public interface UserRepository extends JpaRepository<User, Long> {
    @Modifying
    @Transactional
    @Query("UPDATE User u SET u.points = u.points + :amount WHERE u.id = :id")
    int addPoints(@Param("id") Long id, @Param("amount") int amount);
    
    @Modifying
    @Transactional
    @Query("UPDATE User u SET u.points = u.points - :amount WHERE u.id = :id AND u.points >= :amount")
    int deductPoints(@Param("id") Long id, @Param("amount") int amount);
}

@Service
public class PointService {
    @Autowired
    private UserRepository userRepository;
    
    public void addPoints(Long userId, int amount) {
        userRepository.addPoints(userId, amount);
    }
    
    public void deductPoints(Long userId, int amount) throws InsufficientPointsException {
        int updated = userRepository.deductPoints(userId, amount);
        if (updated == 0) {
            throw new InsufficientPointsException("Недостаточно баллов");
        }
    }
}

Преимущество: БД выполнит инкремент и декремент атомарно, без промежуточных состояний.

Решение 4: Использование очереди сообщений

Для высоконагруженных систем обработка событий последовательно:

@Service
public class PointEventService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void schedulePointsChange(Long userId, int amount, String operation) {
        PointChangeEvent event = new PointChangeEvent(userId, amount, operation);
        // Отправляем событие в очередь
        rabbitTemplate.convertAndSend("points-queue", event);
    }
}

@Service
public class PointChangeListener {
    @Autowired
    private UserRepository userRepository;
    
    @RabbitListener(queues = "points-queue")
    public void processPointChange(PointChangeEvent event) {
        // Обрабатываем последовательно, в одном потоке
        switch(event.getOperation()) {
            case "ADD":
                userRepository.addPoints(event.getUserId(), event.getAmount());
                break;
            case "DEDUCT":
                userRepository.deductPoints(event.getUserId(), event.getAmount());
                break;
        }
    }
}

Какой выбрать?

МетодПлюсыМинусыКогда использовать
SELECT FOR UPDATEГарантирует консистентность, простоМедленнее при высокой конкурентности, может быть deadlockНизкая/средняя нагрузка, требуется чтение и модификация
Version (оптимистичная)Без блокировок, быстрееНужны retry логики, может быть много перепробНизкая конкурентность, редко конфликты
Атомарный SQLСамый быстрый, безопасныйСложнее с логикой, если нужны дополнительные проверкиВысокая нагрузка, простые операции
ОчередьМасштабируемо, разделяет нагрузкуНе synchronous, усложняет обработкуОчень высокая нагрузка, асинхронность допустима

Практический пример для production

@Service
public class PointService {
    @Autowired
    private UserRepository userRepository;
    
    @Transactional
    public void processPointTransaction(Long userId, int amount, String type) {
        User user = userRepository.findByIdForUpdate(userId); // Блокировка
        
        if ("DEDUCT".equals(type) && user.getPoints() < amount) {
            throw new InsufficientPointsException();
        }
        
        int newBalance = "ADD".equals(type) 
            ? user.getPoints() + amount 
            : user.getPoints() - amount;
        
        user.setPoints(newBalance);
        userRepository.save(user);
        
        // Логируем для аудита
        transactionLog.log(userId, type, amount, newBalance);
    }
}

Этот подход обеспечит корректность данных при любой конкурентности.