← Назад к вопросам

Как сделать lock на уровне базы данных?

2.4 Senior🔥 201 комментариев
#Базы данных и SQL#Кэширование и производительность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Блокировки на уровне базы данных в Node.js

Database locks — это критически важный механизм для обеспечения data consistency в многопоточной среде. Расскажу о различных типах и применении в Node.js.

1. Типы блокировок в PostgreSQL

Shared Lock (READ)

-- SELECT FOR SHARE
SELECT * FROM orders WHERE id = 1 FOR SHARE;

-- Несколько транзакций могут одновременно читать
-- Но ни одна не может писать

Exclusive Lock (WRITE)

-- SELECT FOR UPDATE
SELECT * FROM orders WHERE id = 1 FOR UPDATE;

-- Только одна транзакция может держать эту блокировку
-- Исключает все другие блокировки

Разница:

Shared Lock: T1 reads, T2 reads ✅ (параллельное чтение)
Shared Lock: T1 reads, T2 writes ❌ (блокируется запись)

Exclusive Lock: T1 writes, T2 reads ❌ (блокируется чтение)
Exclusive Lock: T1 writes, T2 writes ❌ (блокируется запись)

2. Проблема без блокировок: Race condition

// ❌ Без блокировок — потеря денег
const transferMoney = async (fromId, toId, amount) => {
  // T1 читает баланс
  const from = await Account.findById(fromId);
  // T2 одновременно читает тот же баланс
  
  // T1 уменьшает баланс
  if (from.balance >= amount) {
    from.balance -= amount;
    await from.save();
  }
  
  // T2 делает свой перевод с неактуальными данными
  // Итог: деньги потеряны!
};

Решение: SELECT FOR UPDATE

// ✅ С блокировкой
const transferMoney = async (fromId, toId, amount) => {
  await db.transaction(async (trx) => {
    // Блокируем строку для исключительного доступа
    const from = await trx('accounts')
      .where({ id: fromId })
      .forUpdate()  // <- Exclusive lock
      .first();
    
    if (from.balance < amount) {
      throw new Error('Insufficient funds');
    }
    
    // Теперь только эта транзакция может изменять эту строку
    await trx('accounts')
      .where({ id: fromId })
      .update({ balance: from.balance - amount });
    
    const to = await trx('accounts')
      .where({ id: toId })
      .forUpdate()
      .first();
    
    await trx('accounts')
      .where({ id: toId })
      .update({ balance: to.balance + amount });
  });
};

3. FOR UPDATE variants

Basic FOR UPDATE

-- Блокирует найденные строки
SELECT * FROM orders WHERE status = 'pending' FOR UPDATE;

FOR UPDATE SKIP LOCKED

-- Пропускает уже заблокированные строки
-- Полезно для workers (find next available job)
SELECT * FROM jobs 
WHERE status = 'pending' 
FOR UPDATE SKIP LOCKED 
LIMIT 1;

-- T1 берет job #1
-- T2 пропускает job #1 и берет job #2
-- Оба могут работать параллельно

FOR UPDATE NOWAIT

-- Не ждет блокировку, сразу ошибка
SELECT * FROM orders WHERE id = 1 FOR UPDATE NOWAIT;
-- Если строка заблокирована: ERROR: could not obtain lock

FOR SHARE

-- Shared lock для параллельного чтения
SELECT * FROM users WHERE id = 1 FOR SHARE;
-- Другие транзакции могут читать (FOR SHARE)
-- Но не могут писать (FOR UPDATE)

4. Практический пример: Job Queue

// Система обработки асинхронных задач
const processJobs = async () => {
  const job = await db.transaction(async (trx) => {
    // Берем первый доступный job с блокировкой
    const job = await trx('jobs')
      .where({ status: 'pending' })
      .orderBy('created_at', 'asc')
      .forUpdate()
      .skipLocked()  // Пропускаем заблокированные
      .first();
    
    if (!job) {
      return null;  // Нет доступных задач
    }
    
    // Помечаем как обрабатывается
    await trx('jobs')
      .where({ id: job.id })
      .update({ 
        status: 'processing',
        started_at: new Date(),
        worker_id: process.env.WORKER_ID
      });
    
    return job;
  });
  
  if (!job) {
    return;
  }
  
  try {
    // Обрабатываем задачу вне транзакции
    const result = await processJobLogic(job);
    
    // Сохраняем результат
    await db('jobs')
      .where({ id: job.id })
      .update({
        status: 'completed',
        result: JSON.stringify(result),
        completed_at: new Date()
      });
  } catch (error) {
    // Ошибка - возвращаем в pending
    await db('jobs')
      .where({ id: job.id })
      .update({
        status: 'pending',
        error_message: error.message,
        retry_count: db.raw('retry_count + 1')
      });
  }
};

5. Named Locks (Application-level)

Некоторые задачи требуют блокировки, которая не привязана к строке БД:

-- PostgreSQL Advisory Locks
SELECT pg_advisory_lock(1);
  -- критическая секция
SELECT pg_advisory_unlock(1);

-- Или в Node.js
SELECT pg_advisory_xact_lock(1);  -- Автоматический unlock при COMMIT

Пример: Уникальная генерация номеров

const generateUniqueNumber = async () => {
  const result = await db.raw(
    `SELECT pg_advisory_xact_lock(?);`,
    [1]  // Lock ID
  );
  
  // Теперь только эта транзакция может генерировать номер
  const { next_number } = await db('counters')
    .where({ id: 'invoice' })
    .increment('value', 1)
    .returning('value');
  
  return `INV-${next_number}`;
};

6. Deadlock detection

Проблема: Deadlock

T1: BEGIN TRANSACTION
T1: LOCK table_a
T2: BEGIN TRANSACTION
T2: LOCK table_b
T1: Trying to LOCK table_b -> WAIT
T2: Trying to LOCK table_a -> WAIT

Деадлок! Никто не может продолжить.

Решение: Последовательная блокировка

const transferMoney = async (fromId, toId, amount) => {
  // ВСЕГДА блокируем в одинаковом порядке
  const [firstId, secondId] = [fromId, toId].sort();
  
  await db.transaction(async (trx) => {
    // Сначала меньший ID
    const first = await trx('accounts')
      .where({ id: firstId })
      .forUpdate()
      .first();
    
    // Потом больший ID
    const second = await trx('accounts')
      .where({ id: secondId })
      .forUpdate()
      .first();
    
    // Логика транзакции
  });
};

Обработка Deadlock с retry:

const executeWithRetry = async (operation, maxRetries = 3) => {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await operation();
    } catch (error) {
      if (error.code === '40P01') {  // Deadlock detected
        const delay = Math.pow(2, i) * 100;  // Exponential backoff
        console.log(`Deadlock detected, retrying in ${delay}ms...`);
        await new Promise(r => setTimeout(r, delay));
      } else {
        throw error;
      }
    }
  }
  throw new Error(`Failed after ${maxRetries} retries`);
};

// Использование
await executeWithRetry(() => transferMoney(fromId, toId, amount));

7. Timeout на блокировку

// Установить timeout для блокировки
const result = await db.raw(
  'SET LOCAL lock_timeout = ?',
  ['5s']  // Timeout 5 секунд
);

const account = await db('accounts')
  .where({ id: accountId })
  .forUpdate()
  .first();
// Если блокировка не получена за 5s -> ERROR

8. Мониторинг блокировок

-- Какие блокировки активны?
SELECT 
  l.locktype,
  l.database,
  l.relation::regclass,
  l.page,
  l.tuple,
  l.mode,
  l.granted,
  a.query,
  a.pid
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE NOT l.granted
ORDER BY a.query_start;

В Node.js приложении:

const logLocks = async () => {
  const locks = await db.raw(`
    SELECT 
      locktype,
      mode,
      granted,
      query
    FROM pg_locks
    JOIN pg_stat_activity ON pg_locks.pid = pg_stat_activity.pid
    WHERE NOT granted
  `);
  
  if (locks.rows.length > 0) {
    logger.warn('Waiting locks detected:', locks.rows);
  }
};

// Мониторим каждую минуту
setInterval(logLocks, 60 * 1000);

9. Best Practices

  1. Минимизировать scope блокировки

    // ❌ Плохо: слишком долгая блокировка
    const order = await db('orders')
      .where({ id: orderId })
      .forUpdate()
      .first();
    
    // Долгая бизнес-логика...
    await externalService.validate(order);
    
    // ✅ Хорошо: только необходимое
    const order = await db.transaction(async (trx) => {
      const order = await trx('orders')
        .where({ id: orderId })
        .forUpdate()
        .first();
      
      // Быстрые операции
      await trx('order_items').update(...);
      return order;
    });
    
    // Длительная валидация вне блокировки
    await externalService.validate(order);
    
  2. Избегать nested transactions

    // ❌ Плохо
    await db.transaction(async (trx) => {
      await db.transaction(async (trx2) => {
        // nested не поддерживается!
      });
    });
    
    // ✅ Хорошо: savepoint
    await db.transaction(async (trx) => {
      await trx.transaction(async (savepoint) => {
        // Работает как savepoint
      });
    });
    
  3. Логировать блокировки

    const query = db('orders')
      .where({ id: orderId })
      .forUpdate();
    
    logger.debug('Attempting lock', {
      table: 'orders',
      id: orderId,
      timestamp: new Date()
    });
    
    const result = await query.first();
    
    logger.debug('Lock acquired', {
      table: 'orders',
      id: orderId,
      duration: Date.now() - startTime
    });
    

Итоговая таблица: Когда использовать

СценарийТип блокировкиПример
Финансовые транзакцииFOR UPDATEПеревод денег, списание
Распределенные задачиSKIP LOCKEDJob queue, worker pool
Инкременты счетчиковAdvisory LockInvoice numbers, sequence
Параллельное чтениеFOR SHAREСовместное чтение с гарантией

Блокировки — мощный инструмент, но их неправильное использование приводит к deadlocks и performance issues. Используй их осторожно и только когда действительно нужна консистентность.