Комментарии (1)
🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)
Ответ сгенерирован нейросетью и может содержать ошибки
Типы Stream в Node.js Backend
Stream — один из самых мощных инструментов Node.js для работы с большими объёмами данных. Они позволяют обрабатывать данные порциями вместо загрузки всего в память.
1. Основные типы Stream
Readable Stream
import fs from 'fs';
// Читает файл порциями
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf-8',
highWaterMark: 16 * 1024 // 16 KB буфер
});
readStream.on('data', (chunk) => {
console.log(`Получена порция: ${chunk.length} байт`);
});
readStream.on('end', () => {
console.log('Файл полностью прочитан');
});
readStream.on('error', (error) => {
console.error('Ошибка чтения:', error);
});
Writable Stream
import fs from 'fs';
// Пишет в файл порциями
const writeStream = fs.createWriteStream('output.txt', {
encoding: 'utf-8',
highWaterMark: 16 * 1024
});
writeStream.write('Строка 1\n');
writeStream.write('Строка 2\n');
writeStream.write('Строка 3\n');
writeStream.end(); // Завершить запись
writeStream.on('finish', () => {
console.log('Запись завершена');
});
writeStream.on('error', (error) => {
console.error('Ошибка записи:', error);
});
Duplex Stream (читаемый И записываемый одновременно)
import { Duplex } from 'stream';
// Пример: трансформирует входящие данные и выводит результат
const duplexStream = new Duplex({
write(chunk, encoding, callback) {
// Пишем
console.log('Received:', chunk.toString());
callback(); // Сигнализируем о завершении записи
},
read(size) {
// Читаем
this.push('Transformed data\n');
this.push(null); // EOF
}
});
// Использование
duplexStream.write('input');
duplexStream.on('data', (chunk) => {
console.log('Output:', chunk.toString());
});
Transform Stream (преобразует данные)
import { Transform } from 'stream';
// Трансформирует в uppercase
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
const transformed = chunk.toString().toUpperCase();
callback(null, transformed); // null = ошибка нет, transformed = результат
}
});
// Использование
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
Pass-Through Stream
import { PassThrough } from 'stream';
const passThrough = new PassThrough();
passThrough.on('data', (chunk) => {
console.log('Data:', chunk.toString());
});
passThrough.write('Hello');
passThrough.write(' World');
passThrough.end();
2. Pipe — главный паттерн работы со Stream
import fs from 'fs';
import zlib from 'zlib';
// Читаем файл → сжимаем → пишем результат
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'))
.on('finish', () => console.log('Файл сжат'));
// Можно цеплять много трансформаций
fs.createReadStream('data.csv')
.pipe(csvParser()) // Парсим CSV
.pipe(filterTransform) // Фильтруем
.pipe(sortTransform) // Сортируем
.pipe(fs.createWriteStream('output.csv')); // Пишем результат
3. Обработка ошибок в цепочке pipe
import fs from 'fs';
const source = fs.createReadStream('input.txt');
const destination = fs.createWriteStream('output.txt');
// ❌ Неправильно — ошибки источника не обработаны
source.pipe(destination);
// ✅ Правильно — обработка ошибок везде
source.on('error', (error) => {
console.error('Read error:', error);
});
destination.on('error', (error) => {
console.error('Write error:', error);
});
destination.on('finish', () => {
console.log('Успешно скопирован');
});
// ✅ Или используй pipeline (Node.js 10+)
import { pipeline } from 'stream';
pipeline(
fs.createReadStream('input.txt'),
fs.createWriteStream('output.txt'),
(error) => {
if (error) {
console.error('Pipeline failed:', error);
} else {
console.log('Pipeline succeeded');
}
}
);
4. Практические примеры для backend
CSV Parser Stream
import { Transform } from 'stream';
class CSVParser extends Transform {
constructor(options = {}) {
super(options);
this.header = null;
this.lineNumber = 0;
}
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lines.forEach((line, index) => {
if (index === 0 && !this.header) {
this.header = line.split(',');
return;
}
const values = line.split(',');
const obj = {};
this.header.forEach((key, idx) => {
obj[key.trim()] = values[idx]?.trim();
});
this.push(JSON.stringify(obj) + '\n');
});
callback();
}
}
// Использование
fs.createReadStream('users.csv')
.pipe(new CSVParser())
.pipe(fs.createWriteStream('users.jsonl'));
JSON Stream Parser (для больших JSON файлов)
import JSONStream from 'JSONStream';
import fs from 'fs';
// Читаем большой JSON файл параллельно
fs.createReadStream('data.json')
.pipe(JSONStream.parse('*')) // Парсим каждый элемент
.on('data', (item) => {
// Обрабатываем каждый элемент отдельно
console.log('Item:', item);
})
.on('error', (error) => {
console.error('Parse error:', error);
});
Streaming Upload в API
import express from 'express';
import fs from 'fs';
const app = express();
// Загрузка большого файла
app.post('/upload', (req, res) => {
const uploadPath = './uploads/' + Date.now() + '.bin';
req.pipe(fs.createWriteStream(uploadPath))
.on('finish', () => {
res.json({ success: true, file: uploadPath });
})
.on('error', (error) => {
res.status(500).json({ error: error.message });
});
});
Обработка больших файлов с базой данных
import { Transform } from 'stream';
import { db } from '@/infrastructure/database';
class DatabaseInsertTransform extends Transform {
constructor(tableName, options = {}) {
super({ objectMode: true, ...options });
this.tableName = tableName;
this.batch = [];
this.batchSize = 1000;
}
async transform(record, encoding, callback) {
this.batch.push(record);
if (this.batch.length >= this.batchSize) {
try {
await db.query(`INSERT INTO ${this.tableName} ...`, this.batch);
this.batch = [];
callback();
} catch (error) {
callback(error);
}
} else {
callback();
}
}
async flush(callback) {
if (this.batch.length > 0) {
try {
await db.query(`INSERT INTO ${this.tableName} ...`, this.batch);
callback();
} catch (error) {
callback(error);
}
} else {
callback();
}
}
}
// Использование
fs.createReadStream('users.csv')
.pipe(csvParser())
.pipe(new DatabaseInsertTransform('users'))
.on('finish', () => console.log('Import complete'));
5. Управление давлением (Backpressure)
// ❌ Неправильно — игнорируем backpressure
const reader = fs.createReadStream('huge-file.txt');
const writer = fs.createWriteStream('output.txt');
reader.on('data', (chunk) => {
writer.write(chunk); // Может переполнить буфер!
});
// ✅ Правильно — используем pipe (автоматически)
reader.pipe(writer);
// ✅ Или обрабатываем backpressure вручную
reader.on('data', (chunk) => {
const ok = writer.write(chunk);
if (!ok) {
console.log('Backpressure! Паузируем чтение');
reader.pause();
}
});
writer.on('drain', () => {
console.log('Буфер опустошен. Возобновляем чтение');
reader.resume();
});
6. Сравнение методов
| Метод | Использование | Память |
|---|---|---|
fs.readFile() | Маленькие файлы (<10MB) | Загружает весь файл |
Stream | Большие файлы | Постоянный размер |
pipe() | Трансформации | Оптимален |
pipeline() | Современный подход | Оптимален |
7. Best Practices
- Используй
pipeline()вместоpipe()(лучше обработка ошибок) - Всегда обрабатывай ошибки в каждом stream'е
- Контролируй highWaterMark для оптимизации памяти
- Используй object mode для работы с объектами
- Проверяй backpressure при ручной записи