← Назад к вопросам
Как защитить SharedArrayBuffer от записи из разных worker_threads?
2.4 Senior🔥 101 комментариев
#Node.js и JavaScript#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI30 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Защита SharedArrayBuffer от одновременной записи в Worker Threads
SharedArrayBuffer позволяет разным worker_threads безопасно делиться памятью, но это создаёт серьёзные проблемы race conditions. Нужна синхронизация между потоками.
Проблема: Data Race
// main.js
const { Worker } = require("worker_threads");
const shared = new SharedArrayBuffer(4);
const view = new Int32Array(shared);
const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");
worker1.postMessage({ shared });
worker2.postMessage({ shared });
// worker.js - ПЛОХО: race condition!
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ shared }) => {
const view = new Int32Array(shared);
// Все три потока пишут одновременно в один адрес
view[0]++; // Race condition! Значение может быть неправильным
});
Если worker1 и worker2 одновременно пишут в view[0], результат непредсказуем.
Решение 1: Atomics для атомарных операций
Atomics — встроенный в JavaScript объект для синхронизации. Гарантирует атомарность операций:
// main.js
const { Worker } = require("worker_threads");
const shared = new SharedArrayBuffer(4);
const view = new Int32Array(shared);
const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");
worker1.postMessage({ shared });
worker2.postMessage({ shared });
// worker.js - ХОРОШО: используем Atomics
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ shared }) => {
const view = new Int32Array(shared);
// Atomics.add() гарантирует атомарность
Atomics.add(view, 0, 1); // index=0, delta=1
console.log(Atomics.load(view, 0)); // безопасное чтение
});
Основные операции Atomics:
// Атомарное чтение/запись
Atomics.load(view, index); // прочитать
Atomics.store(view, index, value); // записать
// Атомарные операции
Atomics.add(view, index, value); // увеличить
Atomics.sub(view, index, value); // уменьшить
Atomics.exchange(view, index, value); // поменять и вернуть старое
// Сравнение и обмен (compare-and-swap)
Atomics.compareExchange(view, index, expect, update);
Решение 2: Mutex (мьютекс) для критических секций
Atomics недостаточны для сложной логики. Нужен mutex:
// mutex.js
class Mutex {
constructor(sharedBuffer, index = 0) {
this.view = new Int32Array(sharedBuffer);
this.index = index;
}
lock() {
while (Atomics.compareExchange(this.view, this.index, 0, 1) !== 0) {
// Спинлок: крутимся, пока не получим лок
}
}
unlock() {
Atomics.store(this.view, this.index, 0);
}
}
// main.js
const { Worker } = require("worker_threads");
// 4 байта для мьютекса
const mutexBuffer = new SharedArrayBuffer(4);
const dataBuffer = new SharedArrayBuffer(8); // данные
const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");
worker1.postMessage({ mutexBuffer, dataBuffer });
worker2.postMessage({ mutexBuffer, dataBuffer });
// worker.js
const { parentPort } = require("worker_threads");
class Mutex {
constructor(sharedBuffer, index = 0) {
this.view = new Int32Array(sharedBuffer);
this.index = index;
}
lock() {
while (Atomics.compareExchange(this.view, this.index, 0, 1) !== 0) {
// Спинлок
}
}
unlock() {
Atomics.store(this.view, this.index, 0);
}
}
parentPort.on("message", ({ mutexBuffer, dataBuffer }) => {
const mutex = new Mutex(mutexBuffer);
const dataView = new Int32Array(dataBuffer);
// Критическая секция, защищённая мьютексом
mutex.lock();
try {
const value = Atomics.load(dataView, 0);
Atomics.store(dataView, 0, value + 1); // безопасная операция
} finally {
mutex.unlock();
}
});
Решение 3: Condition Variables (для синхронизации)
Для более сложных случаев используй Atomics.wait()/notify():
// Один worker ждёт сигнала от другого
const { Worker } = require("worker_threads");
const signal = new SharedArrayBuffer(4);
const signalView = new Int32Array(signal);
const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");
worker1.postMessage({ signal, id: 1 });
worker2.postMessage({ signal, id: 2 });
// worker.js
const { parentPort } = require("worker_threads");
parentPort.on("message", ({ signal, id }) => {
const signalView = new Int32Array(signal);
if (id === 1) {
console.log("Worker 1: жду сигнала...");
// Блокируется до notify()
Atomics.wait(signalView, 0, 0); // ждём, пока значение не изменится
console.log("Worker 1: получил сигнал!");
} else {
setTimeout(() => {
console.log("Worker 2: отправляю сигнал");
Atomics.store(signalView, 0, 1); // меняем значение
Atomics.notify(signalView, 0); // будим все, ждущих на этом адресе
}, 1000);
}
});
Решение 4: Лучшая практика - разделение данных
Вместо того чтобы морозиться с синхронизацией, лучше избежать shared state:
// ✓ ХОРОШО: каждому worker свой буфер
const { Worker } = require("worker_threads");
const worker1 = new Worker("./worker.js");
const worker2 = new Worker("./worker.js");
const buffer1 = new SharedArrayBuffer(100);
const buffer2 = new SharedArrayBuffer(100);
worker1.postMessage({ buffer: buffer1 });
worker2.postMessage({ buffer: buffer2 });
// Результаты собираем в main thread без race conditions
Итоговая таблица методов синхронизации
| Метод | Случай использования | Производительность |
|---|---|---|
| Atomics.add/store | Простые счётчики | Отличная |
| Mutex + compareExchange | Критические секции | Хорошая |
| Atomics.wait/notify | Межпоточная синхронизация | Хорошая (no busy-wait) |
| Разделение данных | Большинство случаев | Отличная |
Главный вывод
- Избегай shared state, если можно
- Используй Atomics для простых операций
- Реализуй Mutex для критических секций
- Предпочитай разделение данных перед синхронизацией