学习笔记—Node之读取流的使用与实现
日常的学习笔记,包括 ES6、Promise、Node.js、Webpack、http 原理、Vue全家桶,后续可能还会继续更新 Typescript、Vue3 和 常见的面试题 等等。
在上一篇文章中,我们利用 分片读写(发布订阅模式) 的方式实现了 文件拷贝 的功能。
而 Node 中提供了几种原生解决方案,fs.createReadStream 和 fs.createWriteStream。
顾名思义,这两种方案都是以创建 流(Stream) 的方式进行处理的。
这篇文章我们会着重说明一下 可读流(fs.createReadStream) 的基本使用与它的实现原理。
可读流的使用
fs.createReadStream的具体说明,可以参考官网 fs 文件系统 | Node.js API文档
fs.createReadStream  会创建一个 可读流 用来对文件内容进行读取。方法会返回一个 fs.ReadStream 类的实例作为回调,其父类是 stream.Readable ,属于 stream 类上的一个类。
其中 第一个参数 path 传入需要进行读取的 文件路径 。
第二个参数 option 包括
encoding:编码格式,默认值为null。autoClose:读取完毕后自动关闭,默认值为true。start/end:从文件中读取一定范围的字节,而不是整个文件。highWaterMark:最高水位线,默认长度为64 * 1024,也就是64kb。
我们可以参考下面的例子,了解一下它的使用方法。
// test.txt
123456789
// createReadStream.js
const fs = require("fs");
const path = require("path");
// 返回一个 Readable 类的实例
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
  highWaterMark: 3 // 最高水位线为3,也就是每次读取 3kb
});
rs.on("open", (fd) => { // 此方法是 fs 模块中自己实现的
  console.log(fd); // 3 
});
rs.on("data", (chunk) => {
  console.log(chunk); // 返回Buffer格式   
});
rs.on("end", () => {
  console.log("end"); // 结束事件
});
   
如果我们的文件中包括中文,并把 highWaterMark 改成2,并使用变量进行拼接,就会出现乱码的情况。
// test.txt
莫小尚1234567890
// createReadStream.js
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
  highWaterMark: 2
});
let result = '';
rs.on("data", (chunk) => {
  result += chunk;
});
rs.on("end", () => {
  console.log(result); // ???????1234567890
});
这个原因就是因为 Buffer 在拼接的时候,需要使用其特定方法 .concat 进行拼接。
// 对代码进行修改
const arr = [];
rs.on("data", (chunk) => {
  arr.push(chunk);
});
rs.on("end", () => {
  console.log(Buffer.concat(arr).toString()); // 莫小尚1234567890
});
搞清楚了 ReadStream 的使用,我们就可以尝试自己手写一套 ReadStream 类了。
手写实现可读流
先来看一下完整的代码实现。
// readStream.js 手写方法
const EventEmitter = require("events");
const fs = require("fs");
class ReadStream extends EventEmitter {
  constructor(path, options) {
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.start = options.start || 0;
    this.end = options.end;
    this.emitClose = options.emitClose || true;
    this.encoding = options.encoding;
    this.offset = this.start; // 偏移量
    // 是否需要触发data事件
    this.flowing = false; // 当用户监听data事件后,此属性变成true
    // 即使只绑定了open事件,也可以进行触发(也就是只进行订阅)
    this.open();
    // 每次绑定事件,都会触发newListener的回调
    this.on("newListener", (type) => {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  // 销毁方法
  destory(err) {
    if (err) {
      this.emit("error");
    }
    if (this.fd) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        return this.destory(err);
      }
      this.fd = fd;
      this.emit("open", fd); // 触发open事件
    });
  }
  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", () => this.read());
    }
    // 此处直接读取 this.fd 是读取不到的,所以我们需要先进行判断
    const howManyToRead = this.end
      ? Math.min(this.highWaterMark, this.end - this.offset + 1)
      : this.highWaterMark;
    const buffer = Buffer.alloc(howManyToRead);
    fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => {
        if (err) return this.destory(err);
        if (bytesRead == 0) {
          this.emit("end"); // 触发end事件
          return this.destory(); // 进行销毁
        }
        this.offset += bytesRead;
        this.emit("data", buffer.slice(0, bytesRead));
        // 进行递归,循环输出结果
        if (this.flowing) {
          this.read();
        }
      }
    );
  }
  pause() {
    if (this.flowing) {
      this.flowing = false;
    }
  }
  resume() {
    if (!this.flowing) {
      this.flowing = true;
      this.read();
    }
  }
}
module.exports = ReadStream;
然后我们再对这个方法进行引用。
// test.txt
莫小尚1234567890
// createReadStream.js
const path = require("path");
const ReadStream = require("./readStream");
const rs = new ReadStream(path.resolve(__dirname, "test.txt"), {
  highWaterMark: 2,
});
rs.on("open", (fd) => {
  console.log(fd);
});
const arr = [];
rs.on("data", (chunk) => {
  console.log(chunk);
  arr.push(chunk);
});
rs.on("end", () => {
  console.log(Buffer.concat(arr).toString());
});
然后我们来进行依次解析,看一下 fs.createReadStream 的实现原理究竟是什么样的。
实现方法解析
根据官网,我们可以知道 fs.createReadStream 会接受两个参数,分别是 path 和 option(其中包括很多参数,具体可参考官网)。
- 
创建一个类,并使它继承自
EventEmitter,这样会让他具备 发布订阅模式 的特性。其原本的继承链是class ReadStream extends Readable extends EventEmitter。const EventEmitter = require("events"); class ReadStream { constructor(path, options) { super() this.path = path; this.flags = options.flags || "r"; this.highWaterMark = options.highWaterMark || 64 * 1024; this.start = options.start || 0; this.end = options.end; this.emitClose = options.emitClose || true; this.encoding = options.encoding; } } module.exports = ReadStream; - 
定义一个
flowing属性,并使用.on('newListener')来监听 data事件。// ... class ReadStream extends EventEmitter { constructor(path, options) { // ... // 是否需要触发data事件 this.flowing = false; // 当用户监听data事件后,此属性变成true // 每次绑定事件,都会触发newListener的回调 this.on("newListener", (type) => { if (type === "data") { this.flowing = true; this.read(); } }); } read() { console.log("用户监听了data"); // 打印出此方法 } } - 
引入
fs模块,并定义出一个.open()方法。其中包含一个destory方法,用来处理错误事件。const fs = require("fs"); // ... class ReadStream extends EventEmitter { constructor(path, options) { // ... // 即使只绑定了open事件,也可以进行触发(也就是只进行订阅) this.open(); } // 销毁方法 destory(err) { if (err) { this.emit("error"); } if (this.fd) { fs.close(this.fd, () => { this.emit("close"); }); } } open() { fs.open(this.path, this.flags, (err, fd) => { if (err) { return this.destory(err); } this.fd = fd; this.emit("open", fd); // 触发open事件 }); } // ... }这样我们只要在需要引入的文件中,定义一个监听的
error事件就可以了。// createReadStream.js rs.on("error", (err) => { console.log(err); }); 
? 当然,也可以不进行 error事件 绑定。
- 
在文件打开后,再进行内容读取。
我们都清楚,
fs.open是一个异步操作,所以可能会存在data事件触发时,文件还没有读取完的情况。这时我们就需要进行一个 轮询处理 ,等待
this.emit('open')触发后,再进行read方法。接下来,我们来对
read方法进行改造。// ... class ReadStream extends EventEmitter { constructor(path, options) { // ... } // ... read() { if (typeof this.fd !== "number") { return this.once("open", () => this.read()); } // 此处直接读取 this.fd 是读取不到的,所以我们需要先进行判断 fs.read(this.fd); } }这样就可以保证,
read事件是在文件打开后进行触发的。 - 
现在来对
fs.read的剩余参数进行处理。创建一个
Buffer来对内容进行存储,长度为highWaterMark字节。并且计算start和end这里需要注意,我们在创建
Buffer的时候,需要每次都声明一个新的内存空间来对内容进行存储。不可以在属性中定义一个
buffer属性,这样会导致每次指向的都是同一个内存空间。// ... read() { // 每次都创建一个新的内存空间 const buffer = Buffer.alloc(this.highWaterMark); // ... }然后我们需要对
start和end进行判断。我们先定义一个
offset属性,来记录上一轮计算的偏移量。并且还需要判断,用户当前是否传入了
start和end属性// ... class ReadStream extends EventEmitter { constructor(path, options) { // ... this.offset = this.start; // 偏移量 } // ... read() { // ... const howManyToRead = this.end ? Math.min(this.highWaterMark, this.end - this.offset + 1) : this.highWaterMark; const buffer = Buffer.alloc(howManyToRead); fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => { if (err) return this.destory(err); this.offset += bytesRead; // 可能存在最后一次的buffer大小 大于 实际数据大小的情况,所以使用slice来进行截取 this.emit("data", buffer.slice(0, bytesRead)); // 将结果抛给data事件的回调 }); } }这样我们在前面的 data 事件中,就可以监听到读取的数据了。
// createReadStream.js rs.on("data", (chunk) => { console.log(chunk); //});  - 
进行递归,循环输出结果
这里不用做过多的解释,通过上一步我们不难发现,每次都只输出了一次结果。
所以我们需要进行递归处理,将数据进行完全输出,并触发
end事件。// ... class ReadStream extends EventEmitter { constructor(path, options) { // ... } // ... read() { // ... fs.read(this.fd, buffer, 0, howManyToRead, this.offset, (err,bytesRead) => { if (err) return this.destory(err); // 如果读取不到数据了,就进行销毁 if (bytesRead == 0) { this.emit('end'); // 触发end事件 return this.destory(); // 进行销毁 }; this.offset += bytesRead; this.emit("data", buffer.slice(0, bytesRead)); // 进行递归,循环输出结果 if (this.flowing) { this.read(); } }); } } - 
添加一个
pause和resume来进行流量控制。pause和resume可以控制当前数据流的 停止 和 继续。// ... class ReadStream extends EventEmitter { constructor(path, options) { // ... } // ... pause() { // 判断当前是否读取完毕了 if (this.flowing) { this.flowing = false; } } resume() { // 判断当前是否读取完毕了 if (!this.flowing) { this.flowing = true; this.read(); } } } 
这样我们就手写实现了 fs.createReadStream  方法。
fs.createReadStream 在工作中比较常见,可以用来进行大型文件的处理,所以学好用好此方法还是比较重要的。
本篇文章由 莫小尚 创作,文章中如有任何问题和纰漏,欢迎您的指正与交流。
您也可以关注我的 个人站点、博客园 和 掘金,我会在文章产出后同步上传到这些平台上。
最后感谢您的支持!