← Назад к вопросам

Как устраивал взаимодействие с worker threads?

2.2 Middle🔥 111 комментариев
#Node.js и JavaScript#Архитектура и паттерны

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

# Взаимодействие с Worker Threads в Node.js

Что такое Worker Threads

Worker Threads — это механизм в Node.js для параллельного выполнения JavaScript кода в отдельных потоках. Это решает проблему single-threaded природы Node.js, позволяя использовать многоядерные процессоры для вычислительно сложных операций.

Когда использовать Worker Threads

// ❌ Плохо: блокируем event loop
function heavyComputation(n: number): number {
  let result = 0;
  for (let i = 0; i < n; i++) {
    result += Math.sqrt(i);
  }
  return result;
}

// ❌ Весь сервер зависает на этих вычислениях!
app.get('/calculate', (req, res) => {
  const result = heavyComputation(1000000000);
  res.json({ result });
});

// ✅ Хорошо: используем Worker Thread
app.get('/calculate', async (req, res) => {
  const result = await runInWorker(1000000000);
  res.json({ result });
});

1. Базовая архитектура

Worker (worker.js)

// worker.js
import { parentPort } from 'worker_threads';

parentPort.on('message', (data) => {
  console.log('Worker received:', data);
  
  // Выполняем тяжелые вычисления
  const result = heavyComputation(data);
  
  // Отправляем результат обратно
  parentPort.postMessage({
    success: true,
    result
  });
});

function heavyComputation(n: number): number {
  let result = 0;
  for (let i = 0; i < n; i++) {
    result += Math.sqrt(i);
  }
  return result;
}

Main Thread (main.js)

// main.js
import { Worker } from 'worker_threads';
import path from 'path';
import { fileURLToPath } from 'url';

const __dirname = path.dirname(fileURLToPath(import.meta.url));

function runInWorker(data: number): Promise<number> {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.join(__dirname, 'worker.js'));
    
    // Слушаем сообщение от worker
    worker.on('message', (message) => {
      if (message.success) {
        resolve(message.result);
      } else {
        reject(new Error(message.error));
      }
      worker.terminate(); // Закрываем worker
    });
    
    // Обработка ошибок
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker stopped with exit code ${code}`));
      }
    });
    
    // Отправляем данные worker
    worker.postMessage(data);
  });
}

const result = await runInWorker(1000000000);
console.log('Result:', result);

2. Worker Pool (пул рабочих потоков)

Вместо создания нового worker для каждой задачи, переиспользуем их:

import { Worker } from 'worker_threads';
import path from 'path';

class WorkerPool {
  private workers: Worker[] = [];
  private queue: Array<{
    data: any;
    resolve: (value: any) => void;
    reject: (error: Error) => void;
  }> = [];
  private activeWorkers = new Set<Worker>();

  constructor(
    private workerScript: string,
    private poolSize: number = 4
  ) {
    // Создаем pool workers один раз
    for (let i = 0; i < poolSize; i++) {
      const worker = new Worker(workerScript);
      worker.on('message', this.handleWorkerMessage.bind(this));
      worker.on('error', this.handleWorkerError.bind(this));
      this.workers.push(worker);
    }
  }

  execute(data: any): Promise<any> {
    return new Promise((resolve, reject) => {
      const availableWorker = this.workers.find(
        w => !this.activeWorkers.has(w)
      );

      if (availableWorker) {
        this.activeWorkers.add(availableWorker);
        availableWorker.postMessage(data);
        
        const timeout = setTimeout(() => {
          this.activeWorkers.delete(availableWorker);
          reject(new Error('Worker timeout'));
        }, 30000); // 30 сек timeout

        // Сохраняем callback для этого worker
        (availableWorker as any).currentTask = { resolve, reject, timeout };
      } else {
        // Если нет свободных, добавляем в очередь
        this.queue.push({ data, resolve, reject });
      }
    });
  }

  private handleWorkerMessage(worker: Worker, message: any) {
    const task = (worker as any).currentTask;
    if (task) {
      clearTimeout(task.timeout);
      task.resolve(message);
      (worker as any).currentTask = null;
      this.activeWorkers.delete(worker);

      // Обрабатываем очередь
      if (this.queue.length > 0) {
        const { data, resolve, reject } = this.queue.shift()!;
        this.activeWorkers.add(worker);
        (worker as any).currentTask = { resolve, reject };
        worker.postMessage(data);
      }
    }
  }

  private handleWorkerError(worker: Worker, error: Error) {
    const task = (worker as any).currentTask;
    if (task) {
      task.reject(error);
      (worker as any).currentTask = null;
      this.activeWorkers.delete(worker);
    }
  }

  terminate() {
    this.workers.forEach(w => w.terminate());
    this.workers = [];
  }
}

// Использование
const pool = new WorkerPool('./worker.js', 4);

// Распределяет работу между 4 workers
const results = await Promise.all([
  pool.execute(1000000000),
  pool.execute(1000000000),
  pool.execute(1000000000),
  pool.execute(1000000000)
]);

3. Обмен данными между потоками

Копирование данных (медленно)

const data = { array: new Array(1000000).fill(1) };
worker.postMessage(data); // Копируется полностью

Transferable Objects (быстро)

// Передаём буфер, не копируя
const buffer = new ArrayBuffer(1024);
const view = new Uint8Array(buffer);

worker.postMessage(
  { buffer },
  [buffer] // Второй аргумент - что передать (не копировать)
);

// После этого buffer в main thread больше недоступен!
// Теперь он только в worker

SharedArrayBuffer (совместный доступ)

// shared-buffer.js
import { parentPort, workerData } from 'worker_threads';

const sharedArray = new Int32Array(workerData.sharedBuffer);

// Worker изменяет массив
for (let i = 0; i < sharedArray.length; i++) {
  sharedArray[i] = i * 2;
}

// main.js
const sharedBuffer = new SharedArrayBuffer(4 * 100);
const sharedArray = new Int32Array(sharedBuffer);

const worker = new Worker('./shared-buffer.js', {
  workerData: { sharedBuffer }
});

await new Promise(r => worker.on('exit', r));

// Смотрим результаты работы worker
console.log(sharedArray);

4. Примеры использования

Image Processing

// image-processor.js (worker)
import { parentPort } from 'worker_threads';
import Jimp from 'jimp';

parentPort.on('message', async (imagePath) => {
  const image = await Jimp.read(imagePath);
  image.resize(256, 256).quality(90);
  
  const buffer = await image.getBuffer('image/jpeg');
  parentPort.postMessage({ success: true, buffer });
});

// express-server.js
const pool = new WorkerPool('./image-processor.js', 4);

app.post('/resize-image', async (req, res) => {
  const { imagePath } = req.body;
  const result = await pool.execute(imagePath);
  res.send(result.buffer);
});

Data Processing

// data-processor.js (worker)
import { parentPort } from 'worker_threads';

parentPort.on('message', (largeDataset) => {
  // Тяжелая обработка большого датасета
  const processed = largeDataset
    .map(item => ({
      ...item,
      processed: expensiveCalculation(item)
    }))
    .filter(item => item.processed > 100);
  
  parentPort.postMessage(processed);
});

5. Обработка ошибок

function runInWorker(data: any, timeout: number = 30000): Promise<any> {
  return new Promise((resolve, reject) => {
    const worker = new Worker('./worker.js');
    const timeoutHandle = setTimeout(() => {
      worker.terminate();
      reject(new Error('Worker timeout'));
    }, timeout);

    worker.on('message', (result) => {
      clearTimeout(timeoutHandle);
      resolve(result);
      worker.terminate();
    });

    worker.on('error', (error) => {
      clearTimeout(timeoutHandle);
      reject(error);
      worker.terminate();
    });

    worker.on('exit', (code) => {
      clearTimeout(timeoutHandle);
      if (code !== 0) {
        reject(new Error(`Worker exited with code ${code}`));
      }
    });

    worker.postMessage(data);
  });
}

6. Готовые решения

Piscina (рекомендуется)

import Piscina from 'piscina';

const pool = new Piscina({
  filename: './worker.js',
  maxThreads: 4,
  concurrentTasksPerWorker: 2
});

const result = await pool.run({ x: 1000 });

node-worker-threads-pool

import { StaticPool } from 'node-worker-threads-pool';

const pool = new StaticPool({
  size: 4,
  task: './worker.js'
});

const result = await pool.exec({ n: 1000000000 });

Когда использовать Worker Threads

✅ Используй❌ Не используй
CPU-bound операции (вычисления)I/O операции (DB, HTTP)
Обработка изображенийЛогирование
Шифрование/КомпрессияРабота с сетью
Научные вычисленияКэширование
Парсинг больших данныхДоступ к переменным

Вывод

Worker Threads — это мощный инструмент для:

  • Масштабирования CPU-heavy операций
  • Неблокирования event loop
  • Использования многоядерных процессоров

Но для большинства Web API это не нужно. Node.js достаточно быстр для типичных задач (HTTP, БД). Worker Threads нужны когда:

  1. Операция занимает более 100ms
  2. Она блокирует event loop
  3. Вы видели это в профилировании