Newer
Older
vue-indexer / node_modules / tar-stream / extract.js
const { Writable, Readable, getStreamError } = require('streamx')
const FIFO = require('fast-fifo')
const b4a = require('b4a')
const headers = require('./headers')

const EMPTY = b4a.alloc(0)

class BufferList {
  constructor () {
    this.buffered = 0
    this.shifted = 0
    this.queue = new FIFO()

    this._offset = 0
  }

  push (buffer) {
    this.buffered += buffer.byteLength
    this.queue.push(buffer)
  }

  shiftFirst (size) {
    return this._buffered === 0 ? null : this._next(size)
  }

  shift (size) {
    if (size > this.buffered) return null
    if (size === 0) return EMPTY

    let chunk = this._next(size)

    if (size === chunk.byteLength) return chunk // likely case

    const chunks = [chunk]

    while ((size -= chunk.byteLength) > 0) {
      chunk = this._next(size)
      chunks.push(chunk)
    }

    return b4a.concat(chunks)
  }

  _next (size) {
    const buf = this.queue.peek()
    const rem = buf.byteLength - this._offset

    if (size >= rem) {
      const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
      this.queue.shift()
      this._offset = 0
      this.buffered -= rem
      this.shifted += rem
      return sub
    }

    this.buffered -= size
    this.shifted += size

    return buf.subarray(this._offset, (this._offset += size))
  }
}

class Source extends Readable {
  constructor (self, header, offset) {
    super()

    this.header = header
    this.offset = offset

    this._parent = self
  }

  _read (cb) {
    if (this.header.size === 0) {
      this.push(null)
    }
    if (this._parent._stream === this) {
      this._parent._update()
    }
    cb(null)
  }

  _predestroy () {
    this._parent.destroy(getStreamError(this))
  }

  _detach () {
    if (this._parent._stream === this) {
      this._parent._stream = null
      this._parent._missing = overflow(this.header.size)
      this._parent._update()
    }
  }

  _destroy (cb) {
    this._detach()
    cb(null)
  }
}

class Extract extends Writable {
  constructor (opts) {
    super(opts)

    if (!opts) opts = {}

    this._buffer = new BufferList()
    this._offset = 0
    this._header = null
    this._stream = null
    this._missing = 0
    this._longHeader = false
    this._callback = noop
    this._locked = false
    this._finished = false
    this._pax = null
    this._paxGlobal = null
    this._gnuLongPath = null
    this._gnuLongLinkPath = null
    this._filenameEncoding = opts.filenameEncoding || 'utf-8'
    this._allowUnknownFormat = !!opts.allowUnknownFormat
    this._unlockBound = this._unlock.bind(this)
  }

  _unlock (err) {
    this._locked = false

    if (err) {
      this.destroy(err)
      this._continueWrite(err)
      return
    }

    this._update()
  }

  _consumeHeader () {
    if (this._locked) return false

    this._offset = this._buffer.shifted

    try {
      this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
    } catch (err) {
      this._continueWrite(err)
      return false
    }

    if (!this._header) return true

    switch (this._header.type) {
      case 'gnu-long-path':
      case 'gnu-long-link-path':
      case 'pax-global-header':
      case 'pax-header':
        this._longHeader = true
        this._missing = this._header.size
        return true
    }

    this._locked = true
    this._applyLongHeaders()

    if (this._header.size === 0 || this._header.type === 'directory') {
      this.emit('entry', this._header, this._createStream(), this._unlockBound)
      return true
    }

    this._stream = this._createStream()
    this._missing = this._header.size

    this.emit('entry', this._header, this._stream, this._unlockBound)
    return true
  }

  _applyLongHeaders () {
    if (this._gnuLongPath) {
      this._header.name = this._gnuLongPath
      this._gnuLongPath = null
    }

    if (this._gnuLongLinkPath) {
      this._header.linkname = this._gnuLongLinkPath
      this._gnuLongLinkPath = null
    }

    if (this._pax) {
      if (this._pax.path) this._header.name = this._pax.path
      if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
      if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
      this._header.pax = this._pax
      this._pax = null
    }
  }

  _decodeLongHeader (buf) {
    switch (this._header.type) {
      case 'gnu-long-path':
        this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
        break
      case 'gnu-long-link-path':
        this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
        break
      case 'pax-global-header':
        this._paxGlobal = headers.decodePax(buf)
        break
      case 'pax-header':
        this._pax = this._paxGlobal === null
          ? headers.decodePax(buf)
          : Object.assign({}, this._paxGlobal, headers.decodePax(buf))
        break
    }
  }

  _consumeLongHeader () {
    this._longHeader = false
    this._missing = overflow(this._header.size)

    const buf = this._buffer.shift(this._header.size)

    try {
      this._decodeLongHeader(buf)
    } catch (err) {
      this._continueWrite(err)
      return false
    }

    return true
  }

  _consumeStream () {
    const buf = this._buffer.shiftFirst(this._missing)
    if (buf === null) return false

    this._missing -= buf.byteLength
    const drained = this._stream.push(buf)

    if (this._missing === 0) {
      this._stream.push(null)
      if (drained) this._stream._detach()
      return drained && this._locked === false
    }

    return drained
  }

  _createStream () {
    return new Source(this, this._header, this._offset)
  }

  _update () {
    while (this._buffer.buffered > 0 && !this.destroying) {
      if (this._missing > 0) {
        if (this._stream !== null) {
          if (this._consumeStream() === false) return
          continue
        }

        if (this._longHeader === true) {
          if (this._missing > this._buffer.buffered) break
          if (this._consumeLongHeader() === false) return false
          continue
        }

        const ignore = this._buffer.shiftFirst(this._missing)
        if (ignore !== null) this._missing -= ignore.byteLength
        continue
      }

      if (this._buffer.buffered < 512) break
      if (this._stream !== null || this._consumeHeader() === false) return
    }

    this._continueWrite(null)
  }

  _continueWrite (err) {
    const cb = this._callback
    this._callback = noop
    cb(err)
  }

  _write (data, cb) {
    this._callback = cb
    this._buffer.push(data)
    this._update()
  }

  _final (cb) {
    this._finished = this._missing === 0 && this._buffer.buffered === 0
    cb(this._finished ? null : new Error('Unexpected end of data'))
  }

  _predestroy () {
    this._continueWrite(null)
  }

  _destroy (cb) {
    if (this._stream) this._stream.destroy(getStreamError(this))
    cb(null)
  }

  [Symbol.asyncIterator] () {
    let error = null

    let promiseResolve = null
    let promiseReject = null

    let entryStream = null
    let entryCallback = null

    const extract = this

    this.on('entry', onentry)
    this.on('error', (err) => { error = err })
    this.on('close', onclose)

    return {
      [Symbol.asyncIterator] () {
        return this
      },
      next () {
        return new Promise(onnext)
      },
      return () {
        return destroy(null)
      },
      throw (err) {
        return destroy(err)
      }
    }

    function consumeCallback (err) {
      if (!entryCallback) return
      const cb = entryCallback
      entryCallback = null
      cb(err)
    }

    function onnext (resolve, reject) {
      if (error) {
        return reject(error)
      }

      if (entryStream) {
        resolve({ value: entryStream, done: false })
        entryStream = null
        return
      }

      promiseResolve = resolve
      promiseReject = reject

      consumeCallback(null)

      if (extract._finished && promiseResolve) {
        promiseResolve({ value: undefined, done: true })
        promiseResolve = promiseReject = null
      }
    }

    function onentry (header, stream, callback) {
      entryCallback = callback
      stream.on('error', noop) // no way around this due to tick sillyness

      if (promiseResolve) {
        promiseResolve({ value: stream, done: false })
        promiseResolve = promiseReject = null
      } else {
        entryStream = stream
      }
    }

    function onclose () {
      consumeCallback(error)
      if (!promiseResolve) return
      if (error) promiseReject(error)
      else promiseResolve({ value: undefined, done: true })
      promiseResolve = promiseReject = null
    }

    function destroy (err) {
      extract.destroy(err)
      consumeCallback(err)
      return new Promise((resolve, reject) => {
        if (extract.destroyed) return resolve({ value: undefined, done: true })
        extract.once('close', function () {
          if (err) reject(err)
          else resolve({ value: undefined, done: true })
        })
      })
    }
  }
}

module.exports = function extract (opts) {
  return new Extract(opts)
}

function noop () {}

function overflow (size) {
  size &= 511
  return size && 512 - size
}