← Назад к вопросам
Как устраивал взаимодействие с worker threads?
2.2 Middle🔥 111 комментариев
#Node.js и JavaScript#Архитектура и паттерны
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
# Взаимодействие с Worker Threads в Node.js
Что такое Worker Threads
Worker Threads — это механизм в Node.js для параллельного выполнения JavaScript кода в отдельных потоках. Это решает проблему single-threaded природы Node.js, позволяя использовать многоядерные процессоры для вычислительно сложных операций.
Когда использовать Worker Threads
// ❌ Плохо: блокируем event loop
function heavyComputation(n: number): number {
let result = 0;
for (let i = 0; i < n; i++) {
result += Math.sqrt(i);
}
return result;
}
// ❌ Весь сервер зависает на этих вычислениях!
app.get('/calculate', (req, res) => {
const result = heavyComputation(1000000000);
res.json({ result });
});
// ✅ Хорошо: используем Worker Thread
app.get('/calculate', async (req, res) => {
const result = await runInWorker(1000000000);
res.json({ result });
});
1. Базовая архитектура
Worker (worker.js)
// worker.js
import { parentPort } from 'worker_threads';
parentPort.on('message', (data) => {
console.log('Worker received:', data);
// Выполняем тяжелые вычисления
const result = heavyComputation(data);
// Отправляем результат обратно
parentPort.postMessage({
success: true,
result
});
});
function heavyComputation(n: number): number {
let result = 0;
for (let i = 0; i < n; i++) {
result += Math.sqrt(i);
}
return result;
}
Main Thread (main.js)
// main.js
import { Worker } from 'worker_threads';
import path from 'path';
import { fileURLToPath } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
function runInWorker(data: number): Promise<number> {
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, 'worker.js'));
// Слушаем сообщение от worker
worker.on('message', (message) => {
if (message.success) {
resolve(message.result);
} else {
reject(new Error(message.error));
}
worker.terminate(); // Закрываем worker
});
// Обработка ошибок
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
// Отправляем данные worker
worker.postMessage(data);
});
}
const result = await runInWorker(1000000000);
console.log('Result:', result);
2. Worker Pool (пул рабочих потоков)
Вместо создания нового worker для каждой задачи, переиспользуем их:
import { Worker } from 'worker_threads';
import path from 'path';
class WorkerPool {
private workers: Worker[] = [];
private queue: Array<{
data: any;
resolve: (value: any) => void;
reject: (error: Error) => void;
}> = [];
private activeWorkers = new Set<Worker>();
constructor(
private workerScript: string,
private poolSize: number = 4
) {
// Создаем pool workers один раз
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(workerScript);
worker.on('message', this.handleWorkerMessage.bind(this));
worker.on('error', this.handleWorkerError.bind(this));
this.workers.push(worker);
}
}
execute(data: any): Promise<any> {
return new Promise((resolve, reject) => {
const availableWorker = this.workers.find(
w => !this.activeWorkers.has(w)
);
if (availableWorker) {
this.activeWorkers.add(availableWorker);
availableWorker.postMessage(data);
const timeout = setTimeout(() => {
this.activeWorkers.delete(availableWorker);
reject(new Error('Worker timeout'));
}, 30000); // 30 сек timeout
// Сохраняем callback для этого worker
(availableWorker as any).currentTask = { resolve, reject, timeout };
} else {
// Если нет свободных, добавляем в очередь
this.queue.push({ data, resolve, reject });
}
});
}
private handleWorkerMessage(worker: Worker, message: any) {
const task = (worker as any).currentTask;
if (task) {
clearTimeout(task.timeout);
task.resolve(message);
(worker as any).currentTask = null;
this.activeWorkers.delete(worker);
// Обрабатываем очередь
if (this.queue.length > 0) {
const { data, resolve, reject } = this.queue.shift()!;
this.activeWorkers.add(worker);
(worker as any).currentTask = { resolve, reject };
worker.postMessage(data);
}
}
}
private handleWorkerError(worker: Worker, error: Error) {
const task = (worker as any).currentTask;
if (task) {
task.reject(error);
(worker as any).currentTask = null;
this.activeWorkers.delete(worker);
}
}
terminate() {
this.workers.forEach(w => w.terminate());
this.workers = [];
}
}
// Использование
const pool = new WorkerPool('./worker.js', 4);
// Распределяет работу между 4 workers
const results = await Promise.all([
pool.execute(1000000000),
pool.execute(1000000000),
pool.execute(1000000000),
pool.execute(1000000000)
]);
3. Обмен данными между потоками
Копирование данных (медленно)
const data = { array: new Array(1000000).fill(1) };
worker.postMessage(data); // Копируется полностью
Transferable Objects (быстро)
// Передаём буфер, не копируя
const buffer = new ArrayBuffer(1024);
const view = new Uint8Array(buffer);
worker.postMessage(
{ buffer },
[buffer] // Второй аргумент - что передать (не копировать)
);
// После этого buffer в main thread больше недоступен!
// Теперь он только в worker
SharedArrayBuffer (совместный доступ)
// shared-buffer.js
import { parentPort, workerData } from 'worker_threads';
const sharedArray = new Int32Array(workerData.sharedBuffer);
// Worker изменяет массив
for (let i = 0; i < sharedArray.length; i++) {
sharedArray[i] = i * 2;
}
// main.js
const sharedBuffer = new SharedArrayBuffer(4 * 100);
const sharedArray = new Int32Array(sharedBuffer);
const worker = new Worker('./shared-buffer.js', {
workerData: { sharedBuffer }
});
await new Promise(r => worker.on('exit', r));
// Смотрим результаты работы worker
console.log(sharedArray);
4. Примеры использования
Image Processing
// image-processor.js (worker)
import { parentPort } from 'worker_threads';
import Jimp from 'jimp';
parentPort.on('message', async (imagePath) => {
const image = await Jimp.read(imagePath);
image.resize(256, 256).quality(90);
const buffer = await image.getBuffer('image/jpeg');
parentPort.postMessage({ success: true, buffer });
});
// express-server.js
const pool = new WorkerPool('./image-processor.js', 4);
app.post('/resize-image', async (req, res) => {
const { imagePath } = req.body;
const result = await pool.execute(imagePath);
res.send(result.buffer);
});
Data Processing
// data-processor.js (worker)
import { parentPort } from 'worker_threads';
parentPort.on('message', (largeDataset) => {
// Тяжелая обработка большого датасета
const processed = largeDataset
.map(item => ({
...item,
processed: expensiveCalculation(item)
}))
.filter(item => item.processed > 100);
parentPort.postMessage(processed);
});
5. Обработка ошибок
function runInWorker(data: any, timeout: number = 30000): Promise<any> {
return new Promise((resolve, reject) => {
const worker = new Worker('./worker.js');
const timeoutHandle = setTimeout(() => {
worker.terminate();
reject(new Error('Worker timeout'));
}, timeout);
worker.on('message', (result) => {
clearTimeout(timeoutHandle);
resolve(result);
worker.terminate();
});
worker.on('error', (error) => {
clearTimeout(timeoutHandle);
reject(error);
worker.terminate();
});
worker.on('exit', (code) => {
clearTimeout(timeoutHandle);
if (code !== 0) {
reject(new Error(`Worker exited with code ${code}`));
}
});
worker.postMessage(data);
});
}
6. Готовые решения
Piscina (рекомендуется)
import Piscina from 'piscina';
const pool = new Piscina({
filename: './worker.js',
maxThreads: 4,
concurrentTasksPerWorker: 2
});
const result = await pool.run({ x: 1000 });
node-worker-threads-pool
import { StaticPool } from 'node-worker-threads-pool';
const pool = new StaticPool({
size: 4,
task: './worker.js'
});
const result = await pool.exec({ n: 1000000000 });
Когда использовать Worker Threads
| ✅ Используй | ❌ Не используй |
|---|---|
| CPU-bound операции (вычисления) | I/O операции (DB, HTTP) |
| Обработка изображений | Логирование |
| Шифрование/Компрессия | Работа с сетью |
| Научные вычисления | Кэширование |
| Парсинг больших данных | Доступ к переменным |
Вывод
Worker Threads — это мощный инструмент для:
- Масштабирования CPU-heavy операций
- Неблокирования event loop
- Использования многоядерных процессоров
Но для большинства Web API это не нужно. Node.js достаточно быстр для типичных задач (HTTP, БД). Worker Threads нужны когда:
- Операция занимает более 100ms
- Она блокирует event loop
- Вы видели это в профилировании