index.js

/*
* Data published to queue should follow this format
* { worker, payload, source }
* worker: a bajo.callHandler callable string/function
* payload:
* - type: data type ('string', 'error', 'object', etc)
* - data: data value
* source: <ns>.<subNs>:<path>
*/

import zmq from 'zeromq'

function replacer (key, value) {
  if (value instanceof RegExp) return ('__REGEXP ' + value.toString())
  return value
}

function reviver (key, value) {
  if (value && value.toString && value.toString().indexOf('__REGEXP ') === 0) {
    const m = value.split('__REGEXP ')[1].match(/\/(.*)\/(.*)?/)
    return new RegExp(m[1], m[2] || '')
  }
  return value
}

/**
 * Plugin factory
 *
 * @param {string} pkgName - NPM package name
 * @returns {class}
 */
async function factory (pkgName) {
  const me = this

  /**
   * BajoQueue class
   *
   * @class
   */
  class BajoQueue extends this.app.pluginClass.base {
    static alias = 'q'
    static dependencies = ['dobo']

    constructor () {
      super(pkgName, me.app)
      this.config = {
        worker: true,
        manager: true,
        host: '127.0.0.1',
        port: 27781,
        jobMaxAgeDur: '5min'
      }
      if (this.app.bajo.config.applet) this.config.worker = false

      this.jobs = []
    }

    jobRunner = async () => {
      const { callHandler } = this.app.bajo
      const { omit } = this.app.lib._
      for await (const [msg] of this.puller) {
        const options = JSON.parse(msg.toString(), reviver)
        try {
          await callHandler(options.worker, omit(options, ['worker']))
        } catch (err) {
          if (this.app.bajo.config.log.level === 'trace') console.error(err)
          this.log.error('jobQueueError%s', err.message)
        }
      }
    }

    setupManager = async () => {
      this.pusher = new zmq.Push({ sendTimeout: 0 })
      await this.pusher.bind(`tcp://${this.config.host}:${this.config.port}`)
      this.log.debug('pusherStarted%s%s%d', this.ns, this.config.host, this.config.port)
    }

    setupWorker = async () => {
      this.puller = new zmq.Pull()
      this.puller.connect(`tcp://${this.config.host}:${this.config.port}`)
      this.log.debug('pullerStarted%s%s%d', this.ns, this.config.host, this.config.port)
      this.jobRunner()
    }

    start = async () => {
      if (this.config.manager) await this.setupManager()
      if (this.config.worker) await this.setupWorker()
    }

    push = async (options = {}) => {
      if (!this.config.manager) {
        this.log.error('disabled%s', this.t('manager'))
        return
      }
      try {
        if (!options.worker) throw this.error('isRequired%s', this.t('worker'))
        if (!options.payload) throw this.error('isRequired%s', this.t('payload'))
        if (options.payload.type === 'error') options.payload.data = options.payload.data.message
        await this.pusher.send(JSON.stringify(options, replacer))
      } catch (err) {
        this.log.error('queueError%s', err.message)
      }
    }
  }

  return BajoQueue
}

export default factory