Как сделать lock на уровне базы данных?
Комментарии (1)
Ответ сгенерирован нейросетью и может содержать ошибки
Блокировки на уровне базы данных в Node.js
Database locks — это критически важный механизм для обеспечения data consistency в многопоточной среде. Расскажу о различных типах и применении в Node.js.
1. Типы блокировок в PostgreSQL
Shared Lock (READ)
-- SELECT FOR SHARE
SELECT * FROM orders WHERE id = 1 FOR SHARE;
-- Несколько транзакций могут одновременно читать
-- Но ни одна не может писать
Exclusive Lock (WRITE)
-- SELECT FOR UPDATE
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
-- Только одна транзакция может держать эту блокировку
-- Исключает все другие блокировки
Разница:
Shared Lock: T1 reads, T2 reads ✅ (параллельное чтение)
Shared Lock: T1 reads, T2 writes ❌ (блокируется запись)
Exclusive Lock: T1 writes, T2 reads ❌ (блокируется чтение)
Exclusive Lock: T1 writes, T2 writes ❌ (блокируется запись)
2. Проблема без блокировок: Race condition
// ❌ Без блокировок — потеря денег
const transferMoney = async (fromId, toId, amount) => {
// T1 читает баланс
const from = await Account.findById(fromId);
// T2 одновременно читает тот же баланс
// T1 уменьшает баланс
if (from.balance >= amount) {
from.balance -= amount;
await from.save();
}
// T2 делает свой перевод с неактуальными данными
// Итог: деньги потеряны!
};
Решение: SELECT FOR UPDATE
// ✅ С блокировкой
const transferMoney = async (fromId, toId, amount) => {
await db.transaction(async (trx) => {
// Блокируем строку для исключительного доступа
const from = await trx('accounts')
.where({ id: fromId })
.forUpdate() // <- Exclusive lock
.first();
if (from.balance < amount) {
throw new Error('Insufficient funds');
}
// Теперь только эта транзакция может изменять эту строку
await trx('accounts')
.where({ id: fromId })
.update({ balance: from.balance - amount });
const to = await trx('accounts')
.where({ id: toId })
.forUpdate()
.first();
await trx('accounts')
.where({ id: toId })
.update({ balance: to.balance + amount });
});
};
3. FOR UPDATE variants
Basic FOR UPDATE
-- Блокирует найденные строки
SELECT * FROM orders WHERE status = 'pending' FOR UPDATE;
FOR UPDATE SKIP LOCKED
-- Пропускает уже заблокированные строки
-- Полезно для workers (find next available job)
SELECT * FROM jobs
WHERE status = 'pending'
FOR UPDATE SKIP LOCKED
LIMIT 1;
-- T1 берет job #1
-- T2 пропускает job #1 и берет job #2
-- Оба могут работать параллельно
FOR UPDATE NOWAIT
-- Не ждет блокировку, сразу ошибка
SELECT * FROM orders WHERE id = 1 FOR UPDATE NOWAIT;
-- Если строка заблокирована: ERROR: could not obtain lock
FOR SHARE
-- Shared lock для параллельного чтения
SELECT * FROM users WHERE id = 1 FOR SHARE;
-- Другие транзакции могут читать (FOR SHARE)
-- Но не могут писать (FOR UPDATE)
4. Практический пример: Job Queue
// Система обработки асинхронных задач
const processJobs = async () => {
const job = await db.transaction(async (trx) => {
// Берем первый доступный job с блокировкой
const job = await trx('jobs')
.where({ status: 'pending' })
.orderBy('created_at', 'asc')
.forUpdate()
.skipLocked() // Пропускаем заблокированные
.first();
if (!job) {
return null; // Нет доступных задач
}
// Помечаем как обрабатывается
await trx('jobs')
.where({ id: job.id })
.update({
status: 'processing',
started_at: new Date(),
worker_id: process.env.WORKER_ID
});
return job;
});
if (!job) {
return;
}
try {
// Обрабатываем задачу вне транзакции
const result = await processJobLogic(job);
// Сохраняем результат
await db('jobs')
.where({ id: job.id })
.update({
status: 'completed',
result: JSON.stringify(result),
completed_at: new Date()
});
} catch (error) {
// Ошибка - возвращаем в pending
await db('jobs')
.where({ id: job.id })
.update({
status: 'pending',
error_message: error.message,
retry_count: db.raw('retry_count + 1')
});
}
};
5. Named Locks (Application-level)
Некоторые задачи требуют блокировки, которая не привязана к строке БД:
-- PostgreSQL Advisory Locks
SELECT pg_advisory_lock(1);
-- критическая секция
SELECT pg_advisory_unlock(1);
-- Или в Node.js
SELECT pg_advisory_xact_lock(1); -- Автоматический unlock при COMMIT
Пример: Уникальная генерация номеров
const generateUniqueNumber = async () => {
const result = await db.raw(
`SELECT pg_advisory_xact_lock(?);`,
[1] // Lock ID
);
// Теперь только эта транзакция может генерировать номер
const { next_number } = await db('counters')
.where({ id: 'invoice' })
.increment('value', 1)
.returning('value');
return `INV-${next_number}`;
};
6. Deadlock detection
Проблема: Deadlock
T1: BEGIN TRANSACTION
T1: LOCK table_a
T2: BEGIN TRANSACTION
T2: LOCK table_b
T1: Trying to LOCK table_b -> WAIT
T2: Trying to LOCK table_a -> WAIT
Деадлок! Никто не может продолжить.
Решение: Последовательная блокировка
const transferMoney = async (fromId, toId, amount) => {
// ВСЕГДА блокируем в одинаковом порядке
const [firstId, secondId] = [fromId, toId].sort();
await db.transaction(async (trx) => {
// Сначала меньший ID
const first = await trx('accounts')
.where({ id: firstId })
.forUpdate()
.first();
// Потом больший ID
const second = await trx('accounts')
.where({ id: secondId })
.forUpdate()
.first();
// Логика транзакции
});
};
Обработка Deadlock с retry:
const executeWithRetry = async (operation, maxRetries = 3) => {
for (let i = 0; i < maxRetries; i++) {
try {
return await operation();
} catch (error) {
if (error.code === '40P01') { // Deadlock detected
const delay = Math.pow(2, i) * 100; // Exponential backoff
console.log(`Deadlock detected, retrying in ${delay}ms...`);
await new Promise(r => setTimeout(r, delay));
} else {
throw error;
}
}
}
throw new Error(`Failed after ${maxRetries} retries`);
};
// Использование
await executeWithRetry(() => transferMoney(fromId, toId, amount));
7. Timeout на блокировку
// Установить timeout для блокировки
const result = await db.raw(
'SET LOCAL lock_timeout = ?',
['5s'] // Timeout 5 секунд
);
const account = await db('accounts')
.where({ id: accountId })
.forUpdate()
.first();
// Если блокировка не получена за 5s -> ERROR
8. Мониторинг блокировок
-- Какие блокировки активны?
SELECT
l.locktype,
l.database,
l.relation::regclass,
l.page,
l.tuple,
l.mode,
l.granted,
a.query,
a.pid
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE NOT l.granted
ORDER BY a.query_start;
В Node.js приложении:
const logLocks = async () => {
const locks = await db.raw(`
SELECT
locktype,
mode,
granted,
query
FROM pg_locks
JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
WHERE NOT granted
`);
if (locks.rows.length > 0) {
logger.warn('Waiting locks detected:', locks.rows);
}
};
// Мониторим каждую минуту
setInterval(logLocks, 60 * 1000);
9. Best Practices
-
Минимизировать scope блокировки
// ❌ Плохо: слишком долгая блокировка const order = await db('orders') .where({ id: orderId }) .forUpdate() .first(); // Долгая бизнес-логика... await externalService.validate(order); // ✅ Хорошо: только необходимое const order = await db.transaction(async (trx) => { const order = await trx('orders') .where({ id: orderId }) .forUpdate() .first(); // Быстрые операции await trx('order_items').update(...); return order; }); // Длительная валидация вне блокировки await externalService.validate(order); -
Избегать nested transactions
// ❌ Плохо await db.transaction(async (trx) => { await db.transaction(async (trx2) => { // nested не поддерживается! }); }); // ✅ Хорошо: savepoint await db.transaction(async (trx) => { await trx.transaction(async (savepoint) => { // Работает как savepoint }); }); -
Логировать блокировки
const query = db('orders') .where({ id: orderId }) .forUpdate(); logger.debug('Attempting lock', { table: 'orders', id: orderId, timestamp: new Date() }); const result = await query.first(); logger.debug('Lock acquired', { table: 'orders', id: orderId, duration: Date.now() - startTime });
Итоговая таблица: Когда использовать
| Сценарий | Тип блокировки | Пример |
|---|---|---|
| Финансовые транзакции | FOR UPDATE | Перевод денег, списание |
| Распределенные задачи | SKIP LOCKED | Job queue, worker pool |
| Инкременты счетчиков | Advisory Lock | Invoice numbers, sequence |
| Параллельное чтение | FOR SHARE | Совместное чтение с гарантией |
Блокировки — мощный инструмент, но их неправильное использование приводит к deadlocks и performance issues. Используй их осторожно и только когда действительно нужна консистентность.