← Назад к вопросам
Как распараллелить процесс с большим количеством данных в очереди?
2.8 Senior🔥 101 комментариев
#Брокеры сообщений и очереди#Кэширование и производительность
Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Как распараллелить процесс с большим количеством данных в очереди?
Это критически важный скилл для обработки больших объёмов данных без блокировки приложения. Я использую несколько подходов в зависимости от сценария.
Сценарий 1: Маленький объём данных (< 10K)
Для небольших наборов данных можно использовать Promise.all() с параллельным выполнением.
// Данные для обработки
const users = [
{ id: 1, email: 'user1@test.com' },
{ id: 2, email: 'user2@test.com' },
// ... тысяча юзеров
];
// ПЛОХО - последовательная обработка (медленно)
async function processUsersSequential() {
for (const user of users) {
await sendEmail(user.email);
console.log(`Sent email to ${user.email}`);
}
// Если 1000 юзеров, а отправка письма 100ms — это 100 секунд!
}
// ХОРОШО - параллельная обработка
async function processUsersParallel() {
await Promise.all(
users.map(user => sendEmail(user.email))
);
// 100 писем параллельно — займёт ~100ms вместо 10 сек!
}
// ЛУЧШЕ - контролируемый батч (не перегружать сервис)
async function processUsersInBatches() {
const batchSize = 10;
for (let i = 0; i < users.length; i += batchSize) {
const batch = users.slice(i, i + batchSize);
await Promise.all(
batch.map(user => sendEmail(user.email))
);
console.log(`Processed batch ${i / batchSize + 1}`);
}
// 100 писем батчами по 10 — гораздо безопаснее
}
Сценарий 2: Большой объём (100K - 1M)
Для больших данных нужна очередь с воркерами.
Используя Bull/BullMQ (Redis очередь):
import Queue from 'bull';
// Создай очередь
const emailQueue = new Queue('send-emails', {
redis: { host: 'localhost', port: 6379 }
});
// Добавь задачи в очередь
async function enqueueEmails(users: User[]) {
for (const user of users) {
await emailQueue.add(
{ email: user.email, userId: user.id },
{
attempts: 3, // retry 3 раза при ошибке
backoff: {
type: 'exponential',
delay: 2000 // экспоненциальная задержка
}
}
);
}
console.log(`Enqueued ${users.length} emails`);
}
// Обработчик задач (воркер)
emailQueue.process(5, async (job) => {
// 5 одновременных задач
const { email, userId } = job.data;
try {
await sendEmail(email);
return { success: true };
} catch (err) {
throw new Error(`Failed to send email to ${email}`);
}
});
// События очереди
emailQueue.on('completed', (job) => {
console.log(`Email sent to ${job.data.email}`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Failed to send email: ${err.message}`);
});
// Использование
await enqueueEmails(millionUsers);
Преимущества очереди:
- Асинхронная обработка
- Автоматические retry
- Persistence (если упадёт сервер, очередь восстановится)
- Мониторинг и статистика
- Легко масштабировать (больше воркеров)
Сценарий 3: Микросервисная обработка
Для ОЧЕНЬ больших объёмов (миллионы) — используй отдельные воркер сервисы.
// API сервис - только добавляет в очередь
import express from 'express';
import Queue from 'bull';
const app = express();
const emailQueue = new Queue('send-emails', { redis: {...} });
app.post('/api/send-bulk-emails', async (req, res) => {
const { users } = req.body;
// Добавь в очередь
for (const user of users) {
await emailQueue.add({ email: user.email });
}
res.json({
message: 'Queued',
count: users.length
});
});
app.listen(3000);
// Воркер сервис (отдельный процесс/контейнер)
import Queue from 'bull';
const emailQueue = new Queue('send-emails', { redis: {...} });
// Обработка 20 писем одновременно
emailQueue.process(20, async (job) => {
const { email } = job.data;
await sendEmail(email);
});
# docker-compose.yml - масштабирование
services:
api:
image: myapp:api
ports:
- "3000:3000"
worker:
image: myapp:worker
environment:
- WORKER_CONCURRENCY=20
depends_on:
- redis
worker-2: # второй воркер
image: myapp:worker
environment:
- WORKER_CONCURRENCY=20
worker-3: # третий воркер
image: myapp:worker
environment:
- WORKER_CONCURRENCY=20
redis:
image: redis:7-alpine
Сценарий 4: Stream обработка (для очень больших файлов)
Когда данные идут потоком (loading data.json 500MB):
import fs from 'fs';
import JSONStream from 'JSONStream';
import { Transform } from 'stream';
// Читай файл строкой за строкой
fs.createReadStream('data.json')
.pipe(JSONStream.parse('*')) // парсим JSON
.pipe(
new Transform({
objectMode: true,
async transform(user, encoding, callback) {
try {
// Обработка одного пользователя
await processUser(user);
callback(null, user);
} catch (err) {
callback(err);
}
},
highWaterMark: 10 // 10 пользователей в буфере
})
)
.on('end', () => console.log('Done!'))
.on('error', (err) => console.error(err));
Особенность streams:
- Память не увеличивается (читаем по частям)
- Можно обработать гигабайты данных
- highWaterMark контролирует буфер
Сценарий 5: Worker Threads (параллелизм в одном процессе)
Для CPU-intensive задач:
// main.ts
import { Worker } from 'worker_threads';
import path from 'path';
function runWorker(workerData: any) {
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, 'worker.ts'), {
workerData
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Exit code ${code}`));
});
});
}
// Обработка 100K записей 4 потоками
async function processLargeDataset(records: any[]) {
const threadCount = 4;
const recordsPerThread = Math.ceil(records.length / threadCount);
const promises = [];
for (let i = 0; i < threadCount; i++) {
const chunk = records.slice(
i * recordsPerThread,
(i + 1) * recordsPerThread
);
promises.push(runWorker(chunk));
}
const results = await Promise.all(promises);
return results.flat();
}
// worker.ts
import { parentPort, workerData } from 'worker_threads';
const records = workerData as any[];
const processed = records.map(r => {
// Heavy computation
return complexCalculation(r);
});
parentPort?.postMessage(processed);
Сценарий 6: Графическое сравнение подходов
Объём данных | Подход
─────────────────┼─────────────────────────────
< 1K | Синхронно или Promise.all
1K - 10K | Батчи с Promise.all
10K - 100K | Bull/BullMQ очередь
100K - 1M | Bull + несколько воркеров
1M + | Микросервисы + Kafka/RabbitMQ
Очень большой | Streams
CPU-intensive | Worker Threads
Мой рекомендуемый сетап для production
import Queue from 'bull';
import Redis from 'ioredis';
const redis = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT
});
const emailQueue = new Queue('send-emails', { redis });
// Конфигурация воркера
emailQueue.process(
20, // одновременно обрабатывать 20 задач
async (job) => {
const { email } = job.data;
// Прогресс
job.progress(50);
try {
await sendEmail(email);
job.progress(100);
return { success: true };
} catch (err) {
// Bull автоматически retry
throw err;
}
}
);
// Мониторинг
emailQueue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
// Добавление в очередь
export async function sendBulkEmails(emails: string[]) {
const jobs = emails.map(email =>
emailQueue.add({ email }, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 }
})
);
await Promise.all(jobs);
}
Чеклист оптимизации
- Promise.all вместо await в цикле
- Батчирование для контролирования нагрузки
- Bull/BullMQ очередь для асинхронности
- Redis для persistence
- Несколько воркеров для масштабирования
- Мониторинг очереди (metrics, alerts)
- Graceful shutdown (не теряй в очереди)
- Логирование прогресса
- Retry logic с exponential backoff
Вывод: выбирай подход в зависимости от объёма: маленькие данные → Promise.all, большие → Queue + Workers, очень большие → микросервисы.