Как обеспечить целостность данных при одновременном начислении баллов на счет и их списании
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
# Целостность данных при одновременных операциях со счётом
Проблема
Представь ситуацию: у пользователя на счету 100 баллов. Одновременно:
- Один запрос: начислить 50 баллов (100 + 50 = 150)
- Другой запрос: списать 30 баллов (100 - 30 = 70)
Результат без синхронизации может быть непредсказуемым. Оба запроса читают значение 100, и в зависимости от порядка выполнения итоговое значение будет либо 150, либо 70, хотя правильно должно быть 120.
Время Поток 1 Поток 2 Баланс
--- ------- ------- ------
1 READ balance 100
2 READ balance 100
3 balance = 100-30 70
4 WRITE balance=70 70
5 balance = 100+50 100
6 WRITE balance=150 150 ← НЕПРАВИЛЬНО!
Решение 1: Пессимистическая блокировка (SELECT FOR UPDATE)
Фиксируем строку в БД, чтобы другие не могли её читать пока транзакция не завершится:
@Service
public class PointService {
@Autowired
private PointRepository pointRepository;
@Transactional
public void addPoints(Long userId, int amount) {
// Блокируем строку
User user = pointRepository.findByIdForUpdate(userId);
user.setPoints(user.getPoints() + amount);
pointRepository.save(user);
}
@Transactional
public void deductPoints(Long userId, int amount) throws InsufficientPointsException {
User user = pointRepository.findByIdForUpdate(userId);
if (user.getPoints() < amount) {
throw new InsufficientPointsException("Недостаточно баллов");
}
user.setPoints(user.getPoints() - amount);
pointRepository.save(user);
}
}
@Repository
public interface PointRepository extends JpaRepository<User, Long> {
@Query("SELECT u FROM User u WHERE u.id = :id FOR UPDATE")
@Lock(LockModeType.PESSIMISTIC_WRITE) // Явная блокировка
User findByIdForUpdate(@Param("id") Long id);
}
Как это работает:
Время Поток 1 Поток 2 Баланс
--- ------- ------- ------
1 BEGIN TRANSACTION
2 SELECT FOR UPDATE (ждёт блокировку)
3 balance = 100+50=150
4 WRITE balance=150 150
5 COMMIT
6 SELECT FOR UPDATE (получает блокировку)
7 balance = 150-30=120
8 WRITE balance=120 120 ← ПРАВИЛЬНО!
9 COMMIT
Решение 2: Оптимистическая блокировка (Version)
Вместо физической блокировки добавляем версию (поле которое увеличивается при каждом изменении):
@Entity
public class User {
@Id
private Long id;
private int points;
@Version // Аннотация для оптимистической блокировки
private Long version; // Увеличивается при каждом save
}
@Service
public class PointService {
@Autowired
private UserRepository userRepository;
@Transactional
public void addPoints(Long userId, int amount) {
User user = userRepository.findById(userId).orElseThrow();
user.setPoints(user.getPoints() + amount);
try {
userRepository.save(user); // Если version изменился - выбросит исключение
} catch (OptimisticLockingFailureException e) {
// Повторить попытку
addPoints(userId, amount);
}
}
}
Как это работает:
Поток 1 читает: version=1, points=100
Поток 2 читает: version=1, points=100
Поток 1: points=150, version должен быть 1, БД имеет 1 → UPDATE OK, version=2
Поток 2: points=70, version должен быть 1, БД имеет 2 → UPDATE FAILS (исключение)
Поток 2 повторяет: читает version=2, points=150, вычитает 30 → points=120, version=3
Решение 3: Атомарные операции на уровне БД
Лучший способ — дать БД делать всё в одном SQL запросе:
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
@Modifying
@Transactional
@Query("UPDATE User u SET u.points = u.points + :amount WHERE u.id = :id")
int addPoints(@Param("id") Long id, @Param("amount") int amount);
@Modifying
@Transactional
@Query("UPDATE User u SET u.points = u.points - :amount WHERE u.id = :id AND u.points >= :amount")
int deductPoints(@Param("id") Long id, @Param("amount") int amount);
}
@Service
public class PointService {
@Autowired
private UserRepository userRepository;
public void addPoints(Long userId, int amount) {
userRepository.addPoints(userId, amount);
}
public void deductPoints(Long userId, int amount) throws InsufficientPointsException {
int updated = userRepository.deductPoints(userId, amount);
if (updated == 0) {
throw new InsufficientPointsException("Недостаточно баллов");
}
}
}
Преимущество: БД выполнит инкремент и декремент атомарно, без промежуточных состояний.
Решение 4: Использование очереди сообщений
Для высоконагруженных систем обработка событий последовательно:
@Service
public class PointEventService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void schedulePointsChange(Long userId, int amount, String operation) {
PointChangeEvent event = new PointChangeEvent(userId, amount, operation);
// Отправляем событие в очередь
rabbitTemplate.convertAndSend("points-queue", event);
}
}
@Service
public class PointChangeListener {
@Autowired
private UserRepository userRepository;
@RabbitListener(queues = "points-queue")
public void processPointChange(PointChangeEvent event) {
// Обрабатываем последовательно, в одном потоке
switch(event.getOperation()) {
case "ADD":
userRepository.addPoints(event.getUserId(), event.getAmount());
break;
case "DEDUCT":
userRepository.deductPoints(event.getUserId(), event.getAmount());
break;
}
}
}
Какой выбрать?
| Метод | Плюсы | Минусы | Когда использовать |
|---|---|---|---|
| SELECT FOR UPDATE | Гарантирует консистентность, просто | Медленнее при высокой конкурентности, может быть deadlock | Низкая/средняя нагрузка, требуется чтение и модификация |
| Version (оптимистичная) | Без блокировок, быстрее | Нужны retry логики, может быть много перепроб | Низкая конкурентность, редко конфликты |
| Атомарный SQL | Самый быстрый, безопасный | Сложнее с логикой, если нужны дополнительные проверки | Высокая нагрузка, простые операции |
| Очередь | Масштабируемо, разделяет нагрузку | Не synchronous, усложняет обработку | Очень высокая нагрузка, асинхронность допустима |
Практический пример для production
@Service
public class PointService {
@Autowired
private UserRepository userRepository;
@Transactional
public void processPointTransaction(Long userId, int amount, String type) {
User user = userRepository.findByIdForUpdate(userId); // Блокировка
if ("DEDUCT".equals(type) && user.getPoints() < amount) {
throw new InsufficientPointsException();
}
int newBalance = "ADD".equals(type)
? user.getPoints() + amount
: user.getPoints() - amount;
user.setPoints(newBalance);
userRepository.save(user);
// Логируем для аудита
transactionLog.log(userId, type, amount, newBalance);
}
}
Этот подход обеспечит корректность данных при любой конкурентности.