← Назад к вопросам

Какие знаешь типы stream?

1.7 Middle🔥 181 комментариев
#Node.js и JavaScript

Комментарии (1)

🐱
claude-haiku-4.5PrepBro AI29 мар. 2026 г.(ред.)

Ответ сгенерирован нейросетью и может содержать ошибки

Типы Stream в Node.js Backend

Stream — один из самых мощных инструментов Node.js для работы с большими объёмами данных. Они позволяют обрабатывать данные порциями вместо загрузки всего в память.

1. Основные типы Stream

Readable Stream

import fs from 'fs';

// Читает файл порциями
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf-8',
  highWaterMark: 16 * 1024  // 16 KB буфер
});

readStream.on('data', (chunk) => {
  console.log(`Получена порция: ${chunk.length} байт`);
});

readStream.on('end', () => {
  console.log('Файл полностью прочитан');
});

readStream.on('error', (error) => {
  console.error('Ошибка чтения:', error);
});

Writable Stream

import fs from 'fs';

// Пишет в файл порциями
const writeStream = fs.createWriteStream('output.txt', {
  encoding: 'utf-8',
  highWaterMark: 16 * 1024
});

writeStream.write('Строка 1\n');
writeStream.write('Строка 2\n');
writeStream.write('Строка 3\n');

writeStream.end();  // Завершить запись

writeStream.on('finish', () => {
  console.log('Запись завершена');
});

writeStream.on('error', (error) => {
  console.error('Ошибка записи:', error);
});

Duplex Stream (читаемый И записываемый одновременно)

import { Duplex } from 'stream';

// Пример: трансформирует входящие данные и выводит результат
const duplexStream = new Duplex({
  write(chunk, encoding, callback) {
    // Пишем
    console.log('Received:', chunk.toString());
    callback();  // Сигнализируем о завершении записи
  },
  
  read(size) {
    // Читаем
    this.push('Transformed data\n');
    this.push(null);  // EOF
  }
});

// Использование
duplexStream.write('input');
duplexStream.on('data', (chunk) => {
  console.log('Output:', chunk.toString());
});

Transform Stream (преобразует данные)

import { Transform } from 'stream';

// Трансформирует в uppercase
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    const transformed = chunk.toString().toUpperCase();
    callback(null, transformed);  // null = ошибка нет, transformed = результат
  }
});

// Использование
process.stdin
  .pipe(upperCaseTransform)
  .pipe(process.stdout);

Pass-Through Stream

import { PassThrough } from 'stream';

const passThrough = new PassThrough();

passThrough.on('data', (chunk) => {
  console.log('Data:', chunk.toString());
});

passThrough.write('Hello');
passThrough.write(' World');
passThrough.end();

2. Pipe — главный паттерн работы со Stream

import fs from 'fs';
import zlib from 'zlib';

// Читаем файл → сжимаем → пишем результат
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'))
  .on('finish', () => console.log('Файл сжат'));

// Можно цеплять много трансформаций
fs.createReadStream('data.csv')
  .pipe(csvParser())           // Парсим CSV
  .pipe(filterTransform)       // Фильтруем
  .pipe(sortTransform)         // Сортируем
  .pipe(fs.createWriteStream('output.csv'));  // Пишем результат

3. Обработка ошибок в цепочке pipe

import fs from 'fs';

const source = fs.createReadStream('input.txt');
const destination = fs.createWriteStream('output.txt');

// ❌ Неправильно — ошибки источника не обработаны
source.pipe(destination);

// ✅ Правильно — обработка ошибок везде
source.on('error', (error) => {
  console.error('Read error:', error);
});

destination.on('error', (error) => {
  console.error('Write error:', error);
});

destination.on('finish', () => {
  console.log('Успешно скопирован');
});

// ✅ Или используй pipeline (Node.js 10+)
import { pipeline } from 'stream';

pipeline(
  fs.createReadStream('input.txt'),
  fs.createWriteStream('output.txt'),
  (error) => {
    if (error) {
      console.error('Pipeline failed:', error);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

4. Практические примеры для backend

CSV Parser Stream

import { Transform } from 'stream';

class CSVParser extends Transform {
  constructor(options = {}) {
    super(options);
    this.header = null;
    this.lineNumber = 0;
  }
  
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    
    lines.forEach((line, index) => {
      if (index === 0 && !this.header) {
        this.header = line.split(',');
        return;
      }
      
      const values = line.split(',');
      const obj = {};
      
      this.header.forEach((key, idx) => {
        obj[key.trim()] = values[idx]?.trim();
      });
      
      this.push(JSON.stringify(obj) + '\n');
    });
    
    callback();
  }
}

// Использование
fs.createReadStream('users.csv')
  .pipe(new CSVParser())
  .pipe(fs.createWriteStream('users.jsonl'));

JSON Stream Parser (для больших JSON файлов)

import JSONStream from 'JSONStream';
import fs from 'fs';

// Читаем большой JSON файл параллельно
fs.createReadStream('data.json')
  .pipe(JSONStream.parse('*'))  // Парсим каждый элемент
  .on('data', (item) => {
    // Обрабатываем каждый элемент отдельно
    console.log('Item:', item);
  })
  .on('error', (error) => {
    console.error('Parse error:', error);
  });

Streaming Upload в API

import express from 'express';
import fs from 'fs';

const app = express();

// Загрузка большого файла
app.post('/upload', (req, res) => {
  const uploadPath = './uploads/' + Date.now() + '.bin';
  
  req.pipe(fs.createWriteStream(uploadPath))
    .on('finish', () => {
      res.json({ success: true, file: uploadPath });
    })
    .on('error', (error) => {
      res.status(500).json({ error: error.message });
    });
});

Обработка больших файлов с базой данных

import { Transform } from 'stream';
import { db } from '@/infrastructure/database';

class DatabaseInsertTransform extends Transform {
  constructor(tableName, options = {}) {
    super({ objectMode: true, ...options });
    this.tableName = tableName;
    this.batch = [];
    this.batchSize = 1000;
  }
  
  async transform(record, encoding, callback) {
    this.batch.push(record);
    
    if (this.batch.length >= this.batchSize) {
      try {
        await db.query(`INSERT INTO ${this.tableName} ...`, this.batch);
        this.batch = [];
        callback();
      } catch (error) {
        callback(error);
      }
    } else {
      callback();
    }
  }
  
  async flush(callback) {
    if (this.batch.length > 0) {
      try {
        await db.query(`INSERT INTO ${this.tableName} ...`, this.batch);
        callback();
      } catch (error) {
        callback(error);
      }
    } else {
      callback();
    }
  }
}

// Использование
fs.createReadStream('users.csv')
  .pipe(csvParser())
  .pipe(new DatabaseInsertTransform('users'))
  .on('finish', () => console.log('Import complete'));

5. Управление давлением (Backpressure)

// ❌ Неправильно — игнорируем backpressure
const reader = fs.createReadStream('huge-file.txt');
const writer = fs.createWriteStream('output.txt');

reader.on('data', (chunk) => {
  writer.write(chunk);  // Может переполнить буфер!
});

// ✅ Правильно — используем pipe (автоматически)
reader.pipe(writer);

// ✅ Или обрабатываем backpressure вручную
reader.on('data', (chunk) => {
  const ok = writer.write(chunk);
  
  if (!ok) {
    console.log('Backpressure! Паузируем чтение');
    reader.pause();
  }
});

writer.on('drain', () => {
  console.log('Буфер опустошен. Возобновляем чтение');
  reader.resume();
});

6. Сравнение методов

МетодИспользованиеПамять
fs.readFile()Маленькие файлы (<10MB)Загружает весь файл
StreamБольшие файлыПостоянный размер
pipe()ТрансформацииОптимален
pipeline()Современный подходОптимален

7. Best Practices

  • Используй pipeline() вместо pipe() (лучше обработка ошибок)
  • Всегда обрабатывай ошибки в каждом stream'е
  • Контролируй highWaterMark для оптимизации памяти
  • Используй object mode для работы с объектами
  • Проверяй backpressure при ручной записи