transform.js

/*
 * node-qtdatastream
 * https://github.com/magne4000/node-qtdatastream
 *
 * Copyright (c) 2017 Joël Charles
 * Licensed under the MIT license.
 */

/** @module qtdatastream/transform */

const { Transform } = require('stream');
const debuglib = require('debug');

const { ReadBuffer } = require('./buffer');
const types = require('./types');

const loggerr = debuglib('qtdatastream:transform:read');
const loggerw = debuglib('qtdatastream:transform:write');
const debug = debuglib.enabled('qtdatastream:*');

/**
 * ReadTransform data event.
 *
 * @event ReadTransform#data
 * @property {*} data JS object or type
 */
/**
 * WriteTransform data event.
 *
 * @event WriteTransform#data
 * @property {Buffer} buffer Qt Buffer
 */

/**
 * Transform Qt buffers into JS objects
 * @static
 * @extends stream.Transform
 * @fires ReadTransform#data
 * @example
 * const { ReadTransform } = require('qtdatastream').transform;
 *
 * const reader = new ReadTransform();
 *
 * reader.on('data', (data) => {
 *   // data === 1
 * });
 *
 * // Write a QDataStream
 * reader.write(Buffer.from([ 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01 ]));
 */
class ReadTransform extends Transform {
  constructor() {
    super(Object.assign({ readableObjectMode: true, writableObjectMode: false }));
    this.packet_no = 0;
    this.data_state = { size: Infinity, data: [], recvd: 0 };
  }

  static getsize(bufferlist) {
    // get 4 bytes
    let final_buffer;
    if (bufferlist[0] && bufferlist[0].length >= 4) {
      [ final_buffer ] = bufferlist;
    } else {
      let totallength = 0, i = 0;
      while (totallength < 4 && i < bufferlist.length) {
        totallength += bufferlist[i++].length;
      }
      if (totallength < 4) return Infinity;
      final_buffer = Buffer.concat(bufferlist.slice(0, i), totallength);
    }
    return final_buffer.readUInt32BE(0);
  }

  chunkify() {
    const out = [];
    while (true) {
      const ds = this.data_state;
      if (ds.size === Infinity && ds.recvd >= 4) {
        ds.size = ReadTransform.getsize(ds.data);
      }
      if (ds.size < Infinity && ds.size > 67108864) { // 64MB
        loggerr('Packets should not exceed 64MB (received %d bytes)', ds.size);
        throw new Error('oversized packet detected');
      }
      this.emit('progress', ds.recvd, ds.size);
      if (ds.size + 4 > ds.recvd) {
        if (debug) {
          loggerr('(%d/%d) Waiting for end of buffer', ds.recvd, ds.size + 4);
        }
        return out;
      }
      const buffer = new ReadBuffer(Buffer.concat(ds.data, ds.recvd));
      let size;
      if (debug) {
        loggerr('(%d/%d) Received full buffer', ds.recvd, ds.size + 4);
      }
      try {
        size = types.QUInt.read(buffer);
      } catch (e) {
        throw new Error('Error while fetching buffer size');
      }
      if (debug) {
        loggerr('buffer size : %d', size);
      }
      const parsed = types.QVariant.read(buffer);
      if (debug) {
        loggerr('Received result: %O', parsed);
      }
      out.push(parsed);
      const remaining = buffer.remaining();
      if (remaining !== null) {
        this.data_state = {
          data: [ remaining ],
          recvd: remaining.length,
          size: Infinity
        };
      } else {
        this.data_state = { size: Infinity, data: [], recvd: 0 };
        return out;
      }
    }
  }

  _transform(data, encoding, callback) {
    if (data !== null) {
      this.data_state.data.push(data);
      this.data_state.recvd += data.length;
    }
    let out;
    try {
      out = this.chunkify();
    } catch (e) {
      callback(e);
      return;
    }
    for (let i = 0; i < out.length; i++) {
      this.push(out[i]);
    }
    callback();
  }

  _flush(callback) {
    if (this.data_state && this.data_state.recvd > 0) {
      callback('stream ended in the middle of a packet');
    }
  }
}

/**
 * Transform JS types/objects into Qt buffers
 * @static
 * @extends stream.Transform
 * @fires WriteTransform#data
 * @example
 * const { ReadTransform } = require('qtdatastream').transform;
 *
 * const writer = new WriteTransform();
 *
 * writer.on('data', (data) => {
 *   // data === Buffer <00 00 00 09 00 00 00 03 00 00 00 00 01>
 * });
 *
 * // Write an int
 * writer.write(1);
 */
class WriteTransform extends Transform {
  constructor() {
    super(Object.assign({ readableObjectMode: false, writableObjectMode: true }));
  }

  /**
   * Get the buffer representation of the object.
   * It is prefixed by the packet size as defined in Qt framework.
   * @static
   * @returns {Buffer} A Qt styled buffer ready to be sent through a socket
   */
  static getBuffer(data){
    const buffer = types.QVariant.from(data).toBuffer();
    // Calculate size
    const totalSizeBuffer = types.QUInt.from(buffer.length).toBuffer();
    return Buffer.concat([ totalSizeBuffer, buffer ]);
  }

  _transform(data, encoding, callback) {
    loggerw(data);
    const buffer = WriteTransform.getBuffer(data);
    callback(null, buffer);
  }
}

module.exports = {
  ReadTransform,
  WriteTransform
};