index.js

import PushPull from './lib/types/push-pull.js'
import PubSub from './lib/types/pub-sub.js'
import sendHandler from './lib/send-handler.js'

/**
 * Plugin factory
 *
 * @param {string} pkgName - NPM package name
 * @returns {class}
 */
async function factory (pkgName) {
  const me = this

  /**
   * Masohi class
   *
   * @class
   */
  class Masohi extends this.app.pluginClass.base {
    static alias = 'masohi'
    static dependencies = ['bajo-queue']

    constructor () {
      super(pkgName, me.app)
      this.config = {
        connections: [],
        pipelines: [],
        localPubSub: {
          host: '127.0.0.1',
          port: 17782
        },
        waibu: {
          prefix: 'messaging',
          title: 'messaging'
        },
        waibuMpa: {
          logo: 'wifi',
          icon: 'wifi'
        },
        waibuAdmin: {
          modelDisabled: 'all'
        },
        saveStream: {
          ingest: true,
          ingestHistory: false
        },
        dumpPipelineError: false
      }
      this.types = ['pushPull', 'pubSub']
      this.sourceMsg = {}
    }

    init = async () => {
      const { buildCollections, importPkg } = this.app.bajo

      const connSanitizer = async ({ item }) => {
        const { pick, has } = this.app.lib._
        if (!this.types.includes(item.type)) throw this.error('invalidConnType%s', item.type)
        item.options = item.options ?? {}
        if (!has(item.options, 'host')) throw this.error('isRequired%s', 'options.host')
        if (!has(item.options, 'port')) throw this.error('isRequired%s', 'options.port')
        return pick(item, ['type', 'name', 'options'])
      }

      const pipeSanitizer = async ({ item }) => {
        const { has, isString, map } = this.app.lib._
        const { breakNsPath } = this.app.bajo
        for (const key of ['source', 'handlers']) {
          if (!has(item, key)) throw this.error('isRequired%s', key)
        }
        breakNsPath(item.source)
        item.sourceType = item.sourceType ?? 'connection'
        if (!['connection', 'hook'].includes(item.sourceType)) throw this.error('invalid%s%s', this.t('sourceType'), item.sourceType)
        if (isString(item.handlers)) item.handlers = [item.handlers]
        item.handlers = map(item.handlers, h => {
          if (isString(h)) h = { handler: h }
          return h
        })
        return item
      }

      if (this.config.localPubSub) {
        this.config.connections.unshift({
          name: 'default',
          type: 'pubSub',
          options: {
            host: this.config.localPubSub.host,
            port: this.config.localPubSub.port,
            publisherAutoStart: true,
            subscriberAutoStart: true,
            publisherOpts: {
              sendTimeout: 0
            }
          }
        })
      }
      this.zmq = await importPkg('bajoQueue:zeromq')
      this.connections = await buildCollections({
        ns: this.ns,
        useDefaultName: false,
        handler: connSanitizer,
        container: 'connections'
      })
      this.pipelines = await buildCollections({
        ns: this.ns,
        container: 'pipelines',
        handler: pipeSanitizer
      })
    }

    start = async () => {
      const { eachPlugins, breakNsPath } = this.app.bajo
      const { get, filter, map, isPlainObject, has, merge } = this.app.lib._
      const { outmatchNs } = this.app.lib

      for (const conn of this.connections) {
        conn.options = conn.options ?? {}
        switch (conn.type) {
          case 'pushPull': conn.instance = new PushPull(this, conn); break
          case 'pubSub': conn.instance = new PubSub(this, conn); break
        }
        if (conn.instance) await conn.instance.init()
        this.log.debug('instanceCreatedOnConn%s%s', conn.name, this.t(conn.type))
      }
      // get all pipeline capable connections
      const connPipes = []
      const me = this
      await eachPlugins(async function () {
        const { ns } = this
        const conns = map(filter(get(me, `app.${ns}.connections`, []), c => {
          return c.masohiPipeline
        }), c => `${ns}.${c.name}`)
        connPipes.push(...conns)
      })
      for (const ns of connPipes) {
        const mod = { ns, path: 'data', src: this.ns, level: 1000 }
        mod.handler = async function (params) {
          if (this.config.saveStream.ingest) await this.saveStream(merge({}, params, { type: 'INGEST' }))
          if (this.config.saveStream.ingestHistory) await this.saveStreamHistory(merge({}, params, { type: 'INGEST' }))
          const pipes = filter(this.pipelines, p => {
            return p.sourceType === 'connection' && outmatchNs(params.source, p.source)
          })
          for (const p of pipes) {
            await this.pushToPipeline(p.name, params)
          }
        }
        this.app.bajo.hooks.push(mod)
      }
      // tap hook
      for (const p of this.pipelines) {
        if (p.sourceType !== 'hook') continue
        const { fullNs, path } = breakNsPath(p.source)
        const mod = { ns: fullNs, path, src: this.ns, level: 1000 }
        mod.handler = async function (params) {
          let newParams = {}
          if (isPlainObject(params) && has(params, 'payload') && has(params, 'source')) newParams = params
          else newParams = { payload: params }
          newParams.source = p.source
          await this.pushToPipeline(p.name, newParams)
        }
        this.app.bajo.hooks.push(mod)
      }
      // pubsub
      const conn = this.getConn('default')
      if (!conn) return
      conn.instance.subscriber.subscribe('')
      this._catchAllHandler(conn)
    }

    _catchAllHandler = async (conn) => {
      const { runHook } = this.app.bajo
      const { camelCase } = this.app.lib._
      for await (const [topic, msg] of conn.instance.subscriber) {
        try {
          const [ns, ...args] = topic.toString().split(':')
          const data = JSON.parse(msg.toString())
          await runHook(`${this.ns}.subscriber.${ns}:${camelCase(args.join(':'))}`, data)
        } catch (err) {
          this.log.error('error%s', err.message)
        }
      }
    }

    saveStreamHistory = async (body) => {
      if (!this.app.dobo) return
      const { recordCreate } = this.app.dobo
      await recordCreate('MasohiStreamHistory', body, { noResult: true, noValidation: true })
    }

    saveStream = async (body) => {
      if (!this.app.dobo) return
      const { recordUpsert } = this.app.dobo
      const query = { source: body.source }
      await recordUpsert('MasohiStream', body, { query, noResult: true, noValidation: true })
    }

    // send message
    send = async (params = {}) => {
      const { noQueue = false } = params
      if (noQueue) {
        await sendHandler.call(this, params)
        return
      }
      const { push } = this.app.bajoQueue
      params.worker = 'masohi:workerSend'
      await push(params)
    }

    workerSend = async (params) => {
      await sendHandler.call(this, params)
    }

    getConn = name => {
      const { find } = this.app.lib._
      const conn = find(this.connections, { name })
      if (!conn) {
        this.log.error('notFound%s%s', this.t('Connection'), name)
        return
      }
      return conn
    }

    publish = async (topicName, { payload, source, connection = 'default' }) => {
      if (!topicName || !payload) return
      const { getPlugin, breakNsPath } = this.app.bajo
      const { ns } = breakNsPath(source)
      const conn = this.getConn(connection)
      if (!conn) throw this.error('notFound%s%s', this.t('Connection'), connection)
      const plugin = getPlugin(ns, true)
      if (!plugin) throw this.error('pluginWithNameAliasNotLoaded%s', ns)
      const params = { payload, source, connection }
      await conn.instance.publisher.send([`${plugin.ns}:${topicName}`, JSON.stringify(params)])
    }

    subscribe = async ({ topic, options = {} }) => {
      const { conn = 'default' } = options
      const connection = this.getConn(conn)
      if (!connection) return
      connection.instance.subscriber.subscribe(topic)
    }

    lodashTransform = (method = '', params = {}) => {
      for (const item of method.split('.')) {
        const fn = this.app.lib._[item]
        if (!fn) continue
        params.payload = fn(params.payload)
      }
    }

    preventRepeatedMsg = (params = {}) => {
      const { isEqual } = this.app.lib._
      const { payload, source } = params
      if (isEqual(this.sourceMsg[source], payload)) throw this.error('repeatedMsg')
      this.sourceMsg[source] = payload
    }

    pushToPipeline = async (name, options = {}) => {
      const { find, isString, camelCase } = this.app.lib._
      const { callHandler, runHook } = this.app.bajo
      const { push } = this.app.bajoQueue
      const { source } = options
      const pipe = find(this.pipelines, { name })
      try {
        // handlers/transformers
        for (const item of pipe.handlers) {
          if (isString(item.handler)) await runHook(`${this.ns}.${camelCase(item.handler)}:beforePipe`, options)
          if (item.queue) {
            options.worker = item.handler
            await push(options)
          } else {
            await callHandler(item.handler, options)
            if (isString(item.handler)) await runHook(`${this.ns}.${camelCase(item.handler)}:afterPipe`, options)
          }
        }
      } catch (err) {
        if (this.app.bajo.config.log.level === 'trace') {
          this.log.error('error%s%s%s', this.t('pipeline%s', pipe.name),
            this.t('source%s', source), err.message)
        }
        if (this.config.dumpPipelineError) console.error(err)
      }
    }
  }

  return Masohi
}

export default factory