Duplex等同于继承了Readable,和Writable。
一个Duplex实例duplex,拥有Readable和Writable原型上的所有方法,
而且内部同时包含了_readableState和_writableState。
因此,实现了duplex._read便可以将它当作可读流来用,
实现了duplex._write便可以将它当作可写流来用。
const util = require('util');
const Readable = require('_stream_readable');
const Writable = require('_stream_writable');
util.inherits(Duplex, Readable);
var keys = Object.keys(Writable.prototype);
for (var v = 0; v < keys.length; v++) {
var method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
function Duplex(options) {
if (!(this instanceof Duplex))
return new Duplex(options);
Readable.call(this, options);
Writable.call(this, options);
if (options && options.readable === false)
this.readable = false;
if (options && options.writable === false)
this.writable = false;
this.allowHalfOpen = true;
if (options && options.allowHalfOpen === false)
this.allowHalfOpen = false;
this.once('end', onend);
}
// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
// then we're ok.
if (this.allowHalfOpen || this._writableState.ended)
return;
// no more data can be written.
// But allow more writes to happen in this tick.
process.nextTick(onEndNT, this);
}
function onEndNT(self) {
self.end();
}从上面的实现中可以看出,Duplex的特性便是:既可以当作可读流来使用,又可以当作可写流来使用。
Transform继承自Duplex,但是将内部的两个缓存给关联起来了。
简单来说,就是调用write(data)后,经过_transform的处理,下游可读取到处理后的数据。
var Stream = require('stream')
var transform = Stream.Transform({
transform: function (buf, _, next) {
next(null, buf.toString().toUpperCase())
}
})
transform.pipe(process.stdout)
transform.write('a')
transform.write('b')
transform.end('c')输出:
⌘ node example/transform.js
ABC
write方法接收到数据后,引起_transform方法的调用,
在数据处理完时,需要调用next方法。
next会调用push方法,从而将转换后的数据放入可读缓存。
下游便能读取到。
所以,上面的例子还可以写成:
var Stream = require('stream')
var transform = Stream.Transform({
transform: function (buf, _, next) {
this.push(buf.toString().toUpperCase())
next()
}
})
transform.pipe(process.stdout)
transform.write('a')
transform.write('b')
transform.end('c')注意,next的调用是必须的,用来通知这次处理已经完成,可以开始下一次的处理。
此外,Transform还有一个_flush方法。
当prefinish事件发生时,便会调用它,表示上游已经没有数据要写入了,即“写端”已经结束。
var Stream = require('stream')
var transform = createTransform()
transform.pipe(process.stdout)
transform.on('finish', function () {
console.log('\nfinish')
})
transform.on('end', function () {
console.log('\nend')
})
transform.write('a')
transform.write('b')
transform.end('c')
function createTransform() {
var input = []
return Stream.Transform({
objectMode: true,
transform: function (buf, _, next) {
console.log('transform', buf.toString())
input.push(buf)
next()
},
flush: function (next) {
console.log('flush')
var buf
while (buf = input.pop()) {
this.push(buf)
}
setTimeout(() => {
this.push('extra')
next()
}, 10)
},
})
}输出:
⌘ node example/reverse.js
transform a
transform b
transform c
flush
cba
finish
extra
end
可以看出,实现的_flush方法在end()被调用后执行,然后是finish事件。
当_flush的next被执行时,等同于执行了可读端的push(null),
进而引起end事件触发。
所以,上面的flush方法中也可以不调用next,而是直接push(null):
function createTransform() {
var input = []
return Stream.Transform({
objectMode: true,
transform: function (buf, _, next) {
console.log('transform', buf.toString())
input.push(buf)
next()
},
flush: function () {
console.log('flush')
var buf
while (buf = input.pop()) {
this.push(buf)
}
setTimeout(() => {
this.push('extra')
this.push(null)
}, 10)
},
})
}NOTE
_transform() => end() => flush() => finish => end
因此,如果要等到Transform工作结束,无数据可读,就监听end事件。
如果只是等到无数据再写入,则监听finish事件。
Duplex可同时当作Readable(可读端)和Writable(可写端)来用,
Transform在此基础上,将可读端与可写端的底层打通,
写入的数据会被当作_read调用时获取数据的源,只是数据还经历了_transform的处理。
实际上,Transform的实现便是实现了_read与_write的Duplex,
但它还要求实现_transform来做数据转换。
效果上,对于duplex.write('a')后,duplex.read()并不能读到这个'a',
但transform.write('a')后,transform.read()却能读到_transform('a')。