const stream = require('streamx') const b4a = require('b4a') const defaultEncoding = 'utf8' exports.pipeline = stream.pipeline exports.isStream = stream.isStream exports.isEnded = stream.isEnded exports.isFinished = stream.isFinished exports.getStreamError = stream.getStreamError exports.Readable = class Readable extends stream.Readable { constructor (opts = {}) { super(opts) if (this._construct) this._open = this._construct if (this._read !== stream.Readable.prototype._read) { this._read = read.bind(this, this._read) } if (this._destroy !== stream.Stream.prototype._destroy) { this._destroy = destroy.bind(this, this._destroy) } } push (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } return super.push(chunk) } unshift (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } super.unshift(chunk) } } exports.Writable = class Writable extends stream.Writable { constructor (opts = {}) { super({ ...opts, byteLengthWritable }) if (this._construct) this._open = this._construct if (this._write !== stream.Writable.prototype._write) { this._write = write.bind(this, this._write) } if (this._destroy !== stream.Stream.prototype._destroy) { this._destroy = destroy.bind(this, this._destroy) } } write (chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding) } else { encoding = 'buffer' } const result = super.write({ chunk, encoding }) if (cb) stream.Writable.drained(this).then(cb, cb) return result } end (chunk, encoding, cb) { if (typeof chunk === 'function') { cb = chunk chunk = null } else if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding || defaultEncoding) } else { encoding = 'buffer' } const result = chunk !== undefined && chunk !== null ? super.end({ chunk, encoding }) : super.end() if (cb) this.once('end', cb) return result } } exports.Duplex = class Duplex extends stream.Duplex { constructor (opts = {}) { super({ ...opts, byteLengthWritable }) if (this._construct) this._open = this._construct if (this._read !== stream.Readable.prototype._read) { this._read = read.bind(this, this._read) } if (this._write !== stream.Duplex.prototype._write) { this._write = write.bind(this, this._write) } if (this._destroy !== stream.Stream.prototype._destroy) { this._destroy = destroy.bind(this, this._destroy) } } push (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } return super.push(chunk) } unshift (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } super.unshift(chunk) } write (chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding) } else { encoding = 'buffer' } const result = super.write({ chunk, encoding }) if (cb) stream.Writable.drained(this).then(cb, cb) return result } end (chunk, encoding, cb) { if (typeof chunk === 'function') { cb = chunk chunk = null } else if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding) } else { encoding = 'buffer' } const result = chunk !== undefined && chunk !== null ? super.end({ chunk, encoding }) : super.end() if (cb) this.once('end', cb) return result } } exports.Transform = class Transform extends stream.Transform { constructor (opts = {}) { super({ ...opts, byteLengthWritable }) if (this._transform !== stream.Transform.prototype._transform) { this._transform = transform.bind(this, this._transform) } else { this._transform = passthrough } } push (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } return super.push(chunk) } unshift (chunk, encoding) { if (typeof chunk === 'string') { chunk = b4a.from(chunk, encoding || defaultEncoding) } super.unshift(chunk) } write (chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding) } else { encoding = 'buffer' } const result = super.write({ chunk, encoding }) if (cb) stream.Writable.drained(this).then(cb, cb) return result } end (chunk, encoding, cb) { if (typeof chunk === 'function') { cb = chunk chunk = null } else if (typeof encoding === 'function') { cb = encoding encoding = null } if (typeof chunk === 'string') { encoding = encoding || defaultEncoding chunk = b4a.from(chunk, encoding) } else { encoding = 'buffer' } const result = chunk !== undefined && chunk !== null ? super.end({ chunk, encoding }) : super.end() if (cb) this.once('end', cb) return result } } exports.PassThrough = class PassThrough extends exports.Transform {} exports.finished = function finished (stream, opts, cb) { if (typeof opts === 'function') { cb = opts opts = {} } if (!opts) opts = {} const { cleanup = false } = opts const done = () => { cb(exports.getStreamError(stream, { all: true })) if (cleanup) detach() } const detach = () => { stream.off('close', done) stream.off('error', noop) } if (stream.destroyed) { done() } else { stream.on('close', done) stream.on('error', noop) } return detach } function read (read, cb) { read.call(this, 65536) cb(null) } function write (write, data, cb) { write.call(this, data.chunk, data.encoding, cb) } function transform (transform, data, cb) { transform.call(this, data.chunk, data.encoding, cb) } function destroy (destroy, cb) { destroy.call(this, exports.getStreamError(this), cb) } function passthrough (data, cb) { cb(null, data.chunk) } function byteLengthWritable (data) { return data.chunk.byteLength } function noop () {}