← Назад к вопросам

Как защитить SharedArrayBuffer от записи из разных worker_threads?

2.4 Senior🔥 101 комментариев
#Node.js и JavaScript#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI30 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Защита SharedArrayBuffer от одновременной записи в Worker Threads

SharedArrayBuffer позволяет разным worker_threads безопасно делиться памятью, но это создаёт серьёзные проблемы race conditions. Нужна синхронизация между потоками.

Проблема: Data Race

// main.js
const { Worker } = require("worker_threads");

const shared = new SharedArrayBuffer(4);
const view = new Int32Array(shared);

const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");

worker1.postMessage({ shared });
worker2.postMessage({ shared });

// worker.js - ПЛОХО: race condition!
const { parentPort } = require("worker_threads");

parentPort.on("message", ({ shared }) => {
  const view = new Int32Array(shared);
  
  // Все три потока пишут одновременно в один адрес
  view[0]++;  // Race condition! Значение может быть неправильным
});

Если worker1 и worker2 одновременно пишут в view[0], результат непредсказуем.

Решение 1: Atomics для атомарных операций

Atomics — встроенный в JavaScript объект для синхронизации. Гарантирует атомарность операций:

// main.js
const { Worker } = require("worker_threads");

const shared = new SharedArrayBuffer(4);
const view = new Int32Array(shared);

const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");

worker1.postMessage({ shared });
worker2.postMessage({ shared });

// worker.js - ХОРОШО: используем Atomics
const { parentPort } = require("worker_threads");

parentPort.on("message", ({ shared }) => {
  const view = new Int32Array(shared);
  
  // Atomics.add() гарантирует атомарность
  Atomics.add(view, 0, 1); // index=0, delta=1
  
  console.log(Atomics.load(view, 0)); // безопасное чтение
});

Основные операции Atomics:

// Атомарное чтение/запись
Atomics.load(view, index);        // прочитать
Atomics.store(view, index, value); // записать

// Атомарные операции
Atomics.add(view, index, value);    // увеличить
Atomics.sub(view, index, value);    // уменьшить
Atomics.exchange(view, index, value); // поменять и вернуть старое

// Сравнение и обмен (compare-and-swap)
Atomics.compareExchange(view, index, expect, update);

Решение 2: Mutex (мьютекс) для критических секций

Atomics недостаточны для сложной логики. Нужен mutex:

// mutex.js
class Mutex {
  constructor(sharedBuffer, index = 0) {
    this.view = new Int32Array(sharedBuffer);
    this.index = index;
  }

  lock() {
    while (Atomics.compareExchange(this.view, this.index, 0, 1) !== 0) {
      // Спинлок: крутимся, пока не получим лок
    }
  }

  unlock() {
    Atomics.store(this.view, this.index, 0);
  }
}

// main.js
const { Worker } = require("worker_threads");

// 4 байта для мьютекса
const mutexBuffer = new SharedArrayBuffer(4);
const dataBuffer = new SharedArrayBuffer(8); // данные

const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");

worker1.postMessage({ mutexBuffer, dataBuffer });
worker2.postMessage({ mutexBuffer, dataBuffer });

// worker.js
const { parentPort } = require("worker_threads");

class Mutex {
  constructor(sharedBuffer, index = 0) {
    this.view = new Int32Array(sharedBuffer);
    this.index = index;
  }
  lock() {
    while (Atomics.compareExchange(this.view, this.index, 0, 1) !== 0) {
      // Спинлок
    }
  }
  unlock() {
    Atomics.store(this.view, this.index, 0);
  }
}

parentPort.on("message", ({ mutexBuffer, dataBuffer }) => {
  const mutex = new Mutex(mutexBuffer);
  const dataView = new Int32Array(dataBuffer);
  
  // Критическая секция, защищённая мьютексом
  mutex.lock();
  try {
    const value = Atomics.load(dataView, 0);
    Atomics.store(dataView, 0, value + 1); // безопасная операция
  } finally {
    mutex.unlock();
  }
});

Решение 3: Condition Variables (для синхронизации)

Для более сложных случаев используй Atomics.wait()/notify():

// Один worker ждёт сигнала от другого
const { Worker } = require("worker_threads");

const signal = new SharedArrayBuffer(4);
const signalView = new Int32Array(signal);

const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");

worker1.postMessage({ signal, id: 1 });
worker2.postMessage({ signal, id: 2 });

// worker.js
const { parentPort } = require("worker_threads");

parentPort.on("message", ({ signal, id }) => {
  const signalView = new Int32Array(signal);
  
  if (id === 1) {
    console.log("Worker 1: жду сигнала...");
    // Блокируется до notify()
    Atomics.wait(signalView, 0, 0); // ждём, пока значение не изменится
    console.log("Worker 1: получил сигнал!");
  } else {
    setTimeout(() => {
      console.log("Worker 2: отправляю сигнал");
      Atomics.store(signalView, 0, 1); // меняем значение
      Atomics.notify(signalView, 0);   // будим все, ждущих на этом адресе
    }, 1000);
  }
});

Решение 4: Лучшая практика - разделение данных

Вместо того чтобы морозиться с синхронизацией, лучше избежать shared state:

// ✓ ХОРОШО: каждому worker свой буфер
const { Worker } = require("worker_threads");

const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");

const buffer1 = new SharedArrayBuffer(100);
const buffer2 = new SharedArrayBuffer(100);

worker1.postMessage({ buffer: buffer1 });
worker2.postMessage({ buffer: buffer2 });

// Результаты собираем в main thread без race conditions

Итоговая таблица методов синхронизации

МетодСлучай использованияПроизводительность
Atomics.add/storeПростые счётчикиОтличная
Mutex + compareExchangeКритические секцииХорошая
Atomics.wait/notifyМежпоточная синхронизацияХорошая (no busy-wait)
Разделение данныхБольшинство случаевОтличная

Главный вывод

  • Избегай shared state, если можно
  • Используй Atomics для простых операций
  • Реализуй Mutex для критических секций
  • Предпочитай разделение данных перед синхронизацией