← Назад к вопросам

Как распараллелить процесс с большим количеством данных в очереди?

2.8 Senior🔥 101 комментариев
#Брокеры сообщений и очереди#Кэширование и производительность

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Как распараллелить процесс с большим количеством данных в очереди?

Это критически важный скилл для обработки больших объёмов данных без блокировки приложения. Я использую несколько подходов в зависимости от сценария.

Сценарий 1: Маленький объём данных (< 10K)

Для небольших наборов данных можно использовать Promise.all() с параллельным выполнением.

// Данные для обработки
const users = [
  { id: 1, email: 'user1@test.com' },
  { id: 2, email: 'user2@test.com' },
  // ... тысяча юзеров
];

// ПЛОХО - последовательная обработка (медленно)
async function processUsersSequential() {
  for (const user of users) {
    await sendEmail(user.email);
    console.log(`Sent email to ${user.email}`);
  }
  // Если 1000 юзеров, а отправка письма 100ms — это 100 секунд!
}

// ХОРОШО - параллельная обработка
async function processUsersParallel() {
  await Promise.all(
    users.map(user => sendEmail(user.email))
  );
  // 100 писем параллельно — займёт ~100ms вместо 10 сек!
}

// ЛУЧШЕ - контролируемый батч (не перегружать сервис)
async function processUsersInBatches() {
  const batchSize = 10;
  
  for (let i = 0; i < users.length; i += batchSize) {
    const batch = users.slice(i, i + batchSize);
    
    await Promise.all(
      batch.map(user => sendEmail(user.email))
    );
    
    console.log(`Processed batch ${i / batchSize + 1}`);
  }
  // 100 писем батчами по 10 — гораздо безопаснее
}

Сценарий 2: Большой объём (100K - 1M)

Для больших данных нужна очередь с воркерами.

Используя Bull/BullMQ (Redis очередь):

import Queue from 'bull';

// Создай очередь
const emailQueue = new Queue('send-emails', {
  redis: { host: 'localhost', port: 6379 }
});

// Добавь задачи в очередь
async function enqueueEmails(users: User[]) {
  for (const user of users) {
    await emailQueue.add(
      { email: user.email, userId: user.id },
      { 
        attempts: 3,        // retry 3 раза при ошибке
        backoff: {
          type: 'exponential',
          delay: 2000      // экспоненциальная задержка
        }
      }
    );
  }
  console.log(`Enqueued ${users.length} emails`);
}

// Обработчик задач (воркер)
emailQueue.process(5, async (job) => {
  // 5 одновременных задач
  const { email, userId } = job.data;
  
  try {
    await sendEmail(email);
    return { success: true };
  } catch (err) {
    throw new Error(`Failed to send email to ${email}`);
  }
});

// События очереди
emailQueue.on('completed', (job) => {
  console.log(`Email sent to ${job.data.email}`);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Failed to send email: ${err.message}`);
});

// Использование
await enqueueEmails(millionUsers);

Преимущества очереди:

  • Асинхронная обработка
  • Автоматические retry
  • Persistence (если упадёт сервер, очередь восстановится)
  • Мониторинг и статистика
  • Легко масштабировать (больше воркеров)

Сценарий 3: Микросервисная обработка

Для ОЧЕНЬ больших объёмов (миллионы) — используй отдельные воркер сервисы.

// API сервис - только добавляет в очередь
import express from 'express';
import Queue from 'bull';

const app = express();
const emailQueue = new Queue('send-emails', { redis: {...} });

app.post('/api/send-bulk-emails', async (req, res) => {
  const { users } = req.body;
  
  // Добавь в очередь
  for (const user of users) {
    await emailQueue.add({ email: user.email });
  }
  
  res.json({ 
    message: 'Queued',
    count: users.length 
  });
});

app.listen(3000);
// Воркер сервис (отдельный процесс/контейнер)
import Queue from 'bull';

const emailQueue = new Queue('send-emails', { redis: {...} });

// Обработка 20 писем одновременно
emailQueue.process(20, async (job) => {
  const { email } = job.data;
  await sendEmail(email);
});
# docker-compose.yml - масштабирование
services:
  api:
    image: myapp:api
    ports:
      - "3000:3000"
    
  worker:
    image: myapp:worker
    environment:
      - WORKER_CONCURRENCY=20
    depends_on:
      - redis
    
  worker-2:  # второй воркер
    image: myapp:worker
    environment:
      - WORKER_CONCURRENCY=20
    
  worker-3:  # третий воркер
    image: myapp:worker
    environment:
      - WORKER_CONCURRENCY=20
    
  redis:
    image: redis:7-alpine

Сценарий 4: Stream обработка (для очень больших файлов)

Когда данные идут потоком (loading data.json 500MB):

import fs from 'fs';
import JSONStream from 'JSONStream';
import { Transform } from 'stream';

// Читай файл строкой за строкой
fs.createReadStream('data.json')
  .pipe(JSONStream.parse('*')) // парсим JSON
  .pipe(
    new Transform({
      objectMode: true,
      async transform(user, encoding, callback) {
        try {
          // Обработка одного пользователя
          await processUser(user);
          callback(null, user);
        } catch (err) {
          callback(err);
        }
      },
      highWaterMark: 10 // 10 пользователей в буфере
    })
  )
  .on('end', () => console.log('Done!'))
  .on('error', (err) => console.error(err));

Особенность streams:

  • Память не увеличивается (читаем по частям)
  • Можно обработать гигабайты данных
  • highWaterMark контролирует буфер

Сценарий 5: Worker Threads (параллелизм в одном процессе)

Для CPU-intensive задач:

// main.ts
import { Worker } from 'worker_threads';
import path from 'path';

function runWorker(workerData: any) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.join(__dirname, 'worker.ts'), {
      workerData
    });
    
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) reject(new Error(`Exit code ${code}`));
    });
  });
}

// Обработка 100K записей 4 потоками
async function processLargeDataset(records: any[]) {
  const threadCount = 4;
  const recordsPerThread = Math.ceil(records.length / threadCount);
  
  const promises = [];
  for (let i = 0; i < threadCount; i++) {
    const chunk = records.slice(
      i * recordsPerThread,
      (i + 1) * recordsPerThread
    );
    
    promises.push(runWorker(chunk));
  }
  
  const results = await Promise.all(promises);
  return results.flat();
}
// worker.ts
import { parentPort, workerData } from 'worker_threads';

const records = workerData as any[];
const processed = records.map(r => {
  // Heavy computation
  return complexCalculation(r);
});

parentPort?.postMessage(processed);

Сценарий 6: Графическое сравнение подходов

Объём данных     | Подход
─────────────────┼─────────────────────────────
< 1K             | Синхронно или Promise.all
1K - 10K         | Батчи с Promise.all
10K - 100K       | Bull/BullMQ очередь
100K - 1M        | Bull + несколько воркеров
1M +             | Микросервисы + Kafka/RabbitMQ
Очень большой    | Streams
CPU-intensive    | Worker Threads

Мой рекомендуемый сетап для production

import Queue from 'bull';
import Redis from 'ioredis';

const redis = new Redis({
  host: process.env.REDIS_HOST,
  port: process.env.REDIS_PORT
});

const emailQueue = new Queue('send-emails', { redis });

// Конфигурация воркера
emailQueue.process(
  20, // одновременно обрабатывать 20 задач
  async (job) => {
    const { email } = job.data;
    
    // Прогресс
    job.progress(50);
    
    try {
      await sendEmail(email);
      job.progress(100);
      return { success: true };
    } catch (err) {
      // Bull автоматически retry
      throw err;
    }
  }
);

// Мониторинг
emailQueue.on('completed', (job) => {
  console.log(`Job ${job.id} completed`);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
});

// Добавление в очередь
export async function sendBulkEmails(emails: string[]) {
  const jobs = emails.map(email => 
    emailQueue.add({ email }, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 }
    })
  );
  
  await Promise.all(jobs);
}

Чеклист оптимизации

  • Promise.all вместо await в цикле
  • Батчирование для контролирования нагрузки
  • Bull/BullMQ очередь для асинхронности
  • Redis для persistence
  • Несколько воркеров для масштабирования
  • Мониторинг очереди (metrics, alerts)
  • Graceful shutdown (не теряй в очереди)
  • Логирование прогресса
  • Retry logic с exponential backoff

Вывод: выбирай подход в зависимости от объёма: маленькие данные → Promise.all, большие → Queue + Workers, очень большие → микросервисы.