node常用内置模块(Stream)
一、Stream了解
node.js中的流就是处理流式数据的抽象接口,文件操作系统和网络模块实现了流接口
不使用流的常见问题:
同步读取资源文件,用户与要等待数据读取完成
资源文件最终一次性加载至内存,开销较大
使用流的图解(数据的分段传输):
配合管道对需求的加工:
流处理数据的优势:
时间效率:流的分段处理可以同时操作多个数据chunk
空间效率:同一时间流无须占据大内存空间
使用方便:流配合管理,扩展程序变得简单
node.js中流的分类:
Readalbe:可读流,能够是实现数据的读取
Writealbe:可写流,能够实现数据的写操作
Duplex:双工流,既可读又可写
Transform:转换流,可读可写,还是实现数据转换
nodejs流特点:
Stream模块实现了四个具体的抽象
所有流都继承自EventEmitter
二、基本API
1.可读流
专门生产供程序消费数据的流
自定义可读流:
继承stream里的Readable
重写_read方法调用push产出数据
readable事件:当流中存在可读取的数据是触发
data事件:当流中数据块传给消费者后触发
自定义可读流代码实现:
const { Readable } = require(\'stream\');
// 模拟底层数据
let source = [\'lg\', \'zce\', \'syy\'];
class MyReadable extends Readable {
constructor(source) {
super();
this.source = source;
}
_read() {
let data = this.source.shift() || null; // 如果没有数据,则返回 null
this.push(data); // 将数据推入到流中
}
}
let mr = new MyReadable(source);
// reaadable 默认是暂停模式
// mr.on(\'readable\', () => {
// let data = null;
// while(data = mr.read(2)){
// console.log(data.toString());
// }
// })
// 有可能都不放入缓存中,直接输出
mr.on(\'data\', (chunk) => {
console.log(chunk.toString());
})
2.可写流
用于消费数据的流
自定义可写流:
继承stream模块的Writeable
重写_write方法,调用write执行写入
可写流事件
pipe事件:可读流调用pipe()方法时触发
unpipe事件:可读流调用unpipe()方法时触发
自定义可写流代码实现:
const { Writable } = require(\'stream\');
class MyWriteable extends Writable {
constructor() {
super();
}
_write(chunk, encoding, done) {
process.stdout.write(chunk.toString() + \'<-----\\n\');
process.nextTick(done);
}
}
let mw = new MyWriteable();
mw.write(\'江江学习\', \'utf-8\', () => {
console.log(\'write success\');
})
3.双工流
Duplex是双工流,既能生产又能消费,读写相互独立,读操作创建的数据不能当作写操作的数据源去使用
自定义双工流
继承Duplex类
重写_read方法,调用push生产数据
重写_write方法,调用write消费数据
代码实现:
let { Duplex } = require(\'stream\');
class MyDuplex extends Duplex {
constructor(source) {
super();
this.source = source;
}
_read() {
let data = this.source.shift() || null;
this.push(data);
}
_write(chunk, en, next) {
process.stdout.write(chunk.toString() + \'<-----\\n\');
process.nextTick(next);
}
}
let source = [\'hello\', \'world\', \'!\'];
let md = new MyDuplex(source);
md.write(\'江江\',()=>{
console.log(\'write success\');
})
md.on(\'data\', (chunk) => {
console.log(chunk.toString());
})
Transform
Transform也是一个双工流,读写操作进行了联通
Transform自定义实现:
继承Transform类
重写_transform方法,调用push和callback
重写_flush方法,处理剩余数据
transform自定义代码实现:
let { Transform } = require(\'stream\');
class MyTransform extends Transform{
constructor(){
super();
}
_transform(chunk,en,cb){
this.push(chunk.toString().toUpperCase());
cb(null);
}
}
let t = new MyTransform();
t.write(\'hello\');
t.on(\'data\',(chunk)=>{
console.log(chunk.toString());
})
三、文件读写流
1.文件可读流
文件可读流代码中使用:
const fs = require(\'fs\');
const path = require(\'path\');
let rs = fs.createReadStream(\'test.txt\', {
flags: \'r\',
encoding: null, // 返回buffer
fd: null, // 默认值从3开始的, 0、1、2被输入、输出、错误占用了
mode: 438, // 权限控制
autoClose: false, // 是否自动关闭文件
start: 0, // 从文件的某个位置开始读取
// end: 10, // 在文件的某个位置结束读取
highWaterMark: 16 // 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个
})
// data 事件
// rs.on(\'data\',(chunk)=>{
// console.log(chunk.toString());
// rs.pause(); // 流动模式切换到暂停模式
// setTimeout(()=>{
// rs.resume(); // 恢复到流动模式
// },1000)
// })
// readable 事件
rs.on(\'readable\', () => {
// let data = rs.read();
// console.log(data)
let data = null;
while(data = rs.read(3)){ // 每次从缓存中读取多少个字节
console.log(data.toString());
console.log(\'------\',rs._readableState.length); // 剩余多少个字节
}
})
其它事件:
const fs = require(\'fs\');
const path = require(\'path\');
let rs = fs.createReadStream(\'test.txt\', {
flags: \'r\',
encoding: null, // 返回buffer
fd: null, // 默认值从3开始的, 0、1、2被输入、输出、错误占用了
mode: 438, // 权限控制
autoClose: false, // 是否自动关闭文件
start: 0, // 从文件的某个位置开始读取
// end: 10, // 在文件的某个位置结束读取
highWaterMark: 16 // 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个
})
rs.on(\'open\', (fd) => {
console.log(\'fd\', fd,\'文件打开了\');
})
rs.on(\'close\',()=>{
console.log(\'文件关闭了\')
})
let bufferArr = [];
rs.on(\'data\',(chunk)=>{
bufferArr.push(chunk)
})
rs.on(\'end\',()=>{
console.log(Buffer.concat(bufferArr).toString())
console.log(\'数据被清空之后\')
})
rs.on(\'error\',()=>{
console.log(\'出错了\')
})
2.文件可写流
可写流常用事件:
const fs = require(\'fs\');
const path = require(\'path\');
const ws = fs.createWriteStream(\'test.txt\', {
flags: \'w\',
mode: 438,
fd: null,
encoding: \'utf-8\',
start: 0,
highWaterMark: 16 // 默认16kb
})
ws.write(\'拉钩教育\', () => {
console.log(\'拉钩教育-数据写完了\')
})
// 字符串 或者 buffer ===> fs rs
// ws.write(123456,()=>{
// console.log(\'123456-数据写完了\')
// })
ws.on(\'open\', (fd) => {
console.log(\'open\', fd)
})
// colose 是在数据写入操作全部完成之后再执行
ws.on(\'close\',()=>{
console.log(\'文件关闭了\');
})
ws.write(\'0\');
// end 执行之后就意味着数据写入操作完成
ws.end(\'jiang\'); // 可最后写入一次
// ws.write(\'2\');
ws.on(\'error\',(err)=>{
console.log(\'出错了\');
})
drain事件与读写速度:
/**
* 需求:\"江江学习\" 写入指定的文件
* 01 一次性写入
* 02 分批写入
* 对比:对内存的压力不同
*/
const fs = require(\'fs\');
let ws = fs.createWriteStream(\'test.txt\', {
highWaterMark: 3
});
// ws.write(\'江江学习\');
let source = \'江江学习\'.split(\'\');
let num = 0;
let flag = true;
function executeWrite() {
while (num != source.length && flag) {
flag = ws.write(source[num++]); // 当写入的数据大于等于hightWaterMark时,会返回false
}
}
executeWrite();
ws.on(\'drain\',()=>{ // 缓存中的数据已经被消费完了,才触发
console.log(\'drain 执行了\');
flag = true;
executeWrite();
})
四、背压机制
让数据在的生产者与消费者平滑流动的机制
1.问题发现
看一段代码发现问题:
数据从磁盘读取出来的速度是远远大于写入磁盘的速度的(消费者的速度跟不到生产者的速度的),WriteAble内部维护了一个队列,不能即使的消费数据导致的产能过剩,就会放入该队列中,但队列长度是有上限的,所以在当读写的过程中,如果没有实现被压机制的化,就可能会导致
内存溢出
其它进程运行变慢
GC频繁调用
了解读写机制:
背压机制基本原理代码:
let fs = require(\'fs\');
let rs = fs.createReadStream(\'test.txt\', {
highWaterMark: 4 // Readable默认是16,fs中createReadStream默认为64
})
let ws = fs.createWriteStream(\'test1.txt\', {
highWaterMark: 1
})
let flag = true;
rs.on(\'data\',(chunk)=>{
flag = ws.write(chunk,()=>{
console.log(\'写完了\');
})
if(!flag){
rs.pause();
}
})
ws.on(\'drain\',()=>{
rs.resume();
})
// 可以直接使用pipe
// rs.pipe(ws);
2.模拟可读流
代码实现:
const fs = require(\'fs\');
const EventEmitter = require(\'events\');
class MyFileReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
this.path = path;
this.flags = options.flags || \'r\';
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.readOffset = 0;
this.open();
// 当监听新的事件时,会被触发
this.on(\'newListener\', (type) => {
if (type == \'data\') {
this.read();
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
// 触发自生error事件,这里是回调函数,执行在同步代之后
this.emit(\'error\', err);
}
this.fd = fd;
this.emit(\'open\', this.fd);
});
}
read() {
if (typeof this.fd != \'number\') {
return this.once(\'open\', this.read);
}
let buf = Buffer.alloc(this.highWaterMark);
// let howMuchToRead; // 每次读多少
// if (this.end) {
// // 判断end是否有存在
// howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark); // 使用剩余未读的字节数与highWaterMark中较小的一个
// } else {
// howMuchToRead = this.highWaterMark; // 使用剩余未读的字节数与highWaterMark中较小的一个
// }
// 可以取到末尾end下标的值,所以这里要加一
let howMuchToRead = this.end?Math.min(this.end - this.readOffset + 1, this.highWaterMark):this.highWaterMark
fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes;
this.emit(\'data\', buf.slice(0, readBytes))
this.read()
} else {
this.emit(\'end\')
this.close()
}
})
}
close() {
fs.close(this.fd, () => {
this.emit(\'close\')
});
}
}
let rs = new MyFileReadStream(\'test.txt\', {
end: 7, // 结束位置的下标,可以取到
highWaterMark: 3
});
rs.on(\'open\', (fd) => { // 这里是同步代码,监听该事件在触发事件之前
console.log(\'open\', fd);
})
rs.on(\'error\', (err) => {
console.log(err);
})
rs.on(\'data\', (chunk) => {
console.log(chunk)
})
rs.on(\'end\', () => {
console.log(\'end\')
})
rs.on(\'close\', () => {
console.log(\'close\')
})
五、链表
使用wirte时,有些被写入的内容需要放入缓存中被排队等待,而且要遵循先进先出的规则,这里使用链表的数据结构来保存这些数据
为什么不使用数组:
数组存储数据的长度具有上限
数组存在塌陷问题
模拟链表实现队列:
class Node {
constructor(element, next = null) {
this.element = element;
this.next = next;
}
}
class LinkedList {
constructor() {
this.head = null;
this.size = 0
}
// 获取指定位置节点
_getNode(index) {
if (index < 0 || index >= this.size) {
throw new Error(\'getNode --> index error\')
}
let currentNode = this.head;
while (index--) {
currentNode = currentNode.next;
}
return currentNode;
}
// 确保该下标的位置合法
_checkIndex(index) {
if (index < 0 || index >= this.size) {
throw new Error(\'index 参数错误\')
}
}
add(index, element) {
if (arguments.length == 1) {
element = index;
index = this.size;
}
if (index < 0 || index > this.size) {
throw new Error(\'index 参数错误\')
}
let newNode = new Node(element);
// index == 1 与 index != 1处理方式不同
if (index == 0) {
newNode.next = this.head;
this.head = newNode;
} else {
// 获取指定位置的前一个节点
let prevNode = this._getNode(--index);
newNode.next = prevNode.next;
prevNode.next = newNode;
}
this.size++;
}
remove(index) {
if (this.size == 0) return undefined;
this._checkIndex(index);
let currentNode = this._getNode(index);
if (index == 0) {
this.head = currentNode.next;
} else {
let prevNode = this._getNode(index - 1);
prevNode.next = currentNode.next;
}
this.size--;
currentNode.next = null;
return currentNode;
}
set(index, element) {
this._checkIndex(index);
this._getNode(index).element = element;
}
get(index) {
this._checkIndex(index);
let currentNode = this._getNode(index);
currentNode.next = null;
return currentNode;
}
clear() {
this.head = null;
this.size = 0;
}
}
class Queue {
constructor() {
this.linkedList = new LinkedList();
}
enQueue(data) {
this.linkedList.add(data);
}
deQueue() {
return this.linkedList.remove(0);
}
}
const q = new Queue();
q.enQueue(\'node1\');
q.enQueue(\'node2\');
console.log(q.deQueue());
console.log(q.deQueue());
console.log(q.deQueue());
console.log(q)
模拟可写流:
const fs = require(\'fs\');
const EventsEmitter = require(\'events\');
const Queue = require(\'./linkedlist\');
class MyWriteStream extends EventsEmitter {
constructor(path, options = {}) {
super();
this.path = path;
this.flags = options.flags || \'w\';
this.mode = options.mode || 438;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.end = options.end
this.encoding = options.encoding || \'utf8\';
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.writeOffset = this.start;
this.writing = false;
this.writeLen = 0;
this.needDrain = false;
this.cache = new Queue();
this.open();
}
open() {
// 原生 fs.open
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit(\'error\', err);
return;
}
this.fd = fd;
this.emit(\'open\', fd);
})
}
write(chunk, encoding, cb) {
// 统一成buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.writeLen += chunk.length;
let flag = this.writeLen < this.highWaterMark;
this.needDrain = !flag;
if (this.writing) {
// 当前 是 正在写入状态,所以在这里将数据存入队列
this.cache.enQueue({
chunk,
encoding,
cb
})
} else {
// 当前 不是 正在写入状态,所以在这里执行写入
this.writing = true;
this._write(chunk, encoding, cb);
// this.writing = false;
}
return flag;
}
_write(chunk, encoding, cb) {
if (typeof this.fd != \'number\') {
return this.once(\'open\', () => {
return this._write(chunk, encoding, () => {
cb()
// 清空排队的内容
this._clearBuffer();
});
})
}
fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
this.writeOffset += written;
this.writeLen -= written;
cb && cb();
})
}
_clearBuffer() {
let data = this.cache.deQueue();
if(data){
this._write(data.element.chunk,data.element.encoding,()=>{
data.element.cb();
this._clearBuffer();
})
}else{
if(this.needDrain){
this.needDrain = false;
this.emit(\'drain\')
}
}
}
}
let mws = new MyWriteStream(\'f04.txt\', {
highWaterMark: 4
});
mws.on(\'open\', (fd) => {
console.log(\'open--->\', fd)
})
pipe方法的使用:
const fs = require(\'fs\');
const rs = fs.createReadStream(\'./f04.txt\', {
highWaterMark: 4 // 默认64kb
});
const ws = fs.createWriteStream(\'./f04_copy.txt\', {
highWaterMark: 1 // 默认16kb
})
rs.pipe(ws);
// data 需要查看数据,可监听rs data事件
自定义的pipe方法(有问题,没找出来):
const fs = require(\'fs\');
const EventEmitter = require(\'events\');
class MyFileReadStream extends EventEmitter {
constructor(path, options = {}) {...}
open() {...}
read() {...}
close() {...}
pipe(ws){
this.on(\'data\',(data)=>{
let flag = ws.write(data);
if(!flag){
// 读数据的缓存满了。开启暂停
this.pause();
// 找不到该方法
}
});
this.on(\'drain\',()=>{
// 缓存中的数据被消费完了,继续开启数据读入缓存
this.resume();
})
}
}