Skip to content

Steam

const { PassThrough } = require('stream');
  • 数据从输入到输出不做任何修改
  • 可以用来监听或调试数据流
  • 可以用来连接不同的流

监控数据流

ts
const { PassThrough } = require('stream');
const fs = require('fs');

const pass = new PassThrough();

pass.on('data', ( chunk )=>{
    console.log(chunk)
})

fs.createReadStream('./steam.md').pipe(pass).pipe(process.stdout);

文件上传进度

ts

function uploadWithProgress(fileStream, totalSize) {
    const pass = new PassThrough();
    let uploadedSize = 0;

    pass.on('data', (chunk) => {
        uploadedSize += chunk.length;
        const progress = (uploadedSize / totalSize) * 100;
        console.log(`上传进度: ${progress}%`);
    });
    return fileStream.pipe(pass);
}

fs.pipe

用于连接可读流(fs.createReadStream)以及可写流(fs.createWriteStream), 使得数据可以自动从可读流中读取, 并写入到可写流中。

链式调用

ts
const fs = require('fs');

fs.createReadStream('./steam.md')
    .pipe(zlib.createGzip()) // 压缩
    .pipe(fs.createWriteStream('./steam-copy.md'))
    .on('finish', () => {
        console.log('文件复制完成');
    });
ts
const fs = require('fs');
const { PassThrough } = require('stream');

function uploadFile(sourceStream, filename){

    const pass = new PassThrough();

    const writeStream = fs.createWriteStream(filename);

    let totalSize = 0;

    pass.on('data', (chunk) => {
        totalSize += chunk.length;
        console.log(`上传进度: ${totalSize} bytes`);
    });

    return sourceStream.pipe(pass).pipe(writeStream);
}

背压处理

ts
const fs = require('fs');

const readStream = fs.createReadStream('bigfile.txt');
const writeStream = fs.createWriteStream('output.txt');

readStream.pipe(writeStream);

readStream.on('data', (chunk) => {
    if(writeStream.write(chunk) === false){
        console.log('背压了')  
        readStream.pause(); //  暂停数据流动,停止触发'data'事件
    }
});

writeStream.on('drain', () => {
    readStream.resume(); // 恢复数据流动,继续触发'data'事件
});

data事件

data事件在流读取数据时触发,当流读取到数据时,会触发data事件,并传递读取到的数据。

暂停模式:

ts
const fs = require('fs');

const readStream = fs.createReadStream('bigfile.txt');
// 不添加data监听器,流就会保持在暂停模式

已结束:

ts
readStream.on('end', () => {
  // 流结束后,data事件不会再触发
  console.log('流已结束');
});
ts
readStream.on('error', (err) => {
  console.error('读取错误:', err);
});
本站访客数 人次 本站总访问量