import crypto from 'crypto'
import { createGzip, createGunzip } from 'zlib'
import path from 'path'
import { Readable } from 'stream'
import numbro from 'numbro'
async function fetching ({ url, opts, bulk, spin }) {
const { setImmediate, print } = this.app.bajo
const { isEmpty, isFunction, has } = this.app.lib._
const { validationErrorMessage } = this.app.bajoDb
const resp = await this.fetchUrl(url, opts ?? {})
if (isEmpty(resp)) {
spin.fatal('noServerResponse')
return -1
}
if (bulk.abort) {
const aborted = await bulk.abort.call(this, resp)
if (aborted) {
spin.fatal(aborted)
return -1
}
}
let count = 0
const stat = { created: 0, updated: 0, skipped: 0, error: 0 }
bulk.dataKey = bulk.dataKey ?? 'data'
if (bulk.printCount === true) bulk.printCount = 100
const data = isFunction(bulk.dataKey) ? await bulk.dataKey.call(this, resp) : resp[bulk.dataKey]
if (data.length === 0) {
print.warn('noRecordToProcess')
return 0
}
spin.setText('gotRecordsProcessing%d', data.length)
for (let r of data) {
await setImmediate()
if (bulk.converter) r = await bulk.converter.call(this, r, bulk)
if (isEmpty(r)) {
stat.skipped++
continue
}
try {
const result = await bulk.handler.call(this, r, bulk)
if (result && has(stat, result)) stat[result]++
if (bulk.printCount && bulk.printCount < count && (count % bulk.printCount === 0)) print.succeed('[%s] Processed %d/%d', spin.getElapsed(), count, data.length)
else if (!spin.opts.isLog) spin.setText('rec%d%d', count, data.length)
count++
} catch (err) {
console.log(err)
spin.setText(validationErrorMessage(err) + ', continue')
}
}
print.succeed('recProcessed%s%d%d', spin.getElapsed(), count, data.length)
if (!bulk.noStat) print.succeed('createdUpdatedSkipped%s%d%d%d', spin.getElapsed(), stat.created, stat.updated, stat.skipped)
return data.length
}
async function handler (rec, bulk) {
const { isFunction, set } = this.app.lib._
const { recordCreate, recordFind, recordUpdate } = this.app.bajoDb
const save = bulk.save ?? {}
const current = save.current ?? {}
let existing
let record
let method
save.checkUnique = save.checkUnique ?? 'id'
if (['unique', 'upsert'].includes(save.mode)) {
const query = isFunction(save.checkUnique) ? await save.checkUnique.call(this, rec, save) : set({}, save.checkUnique, rec[save.checkUnique])
const resp = await recordFind(save.coll, { query, limit: 1 }, { noCache: true })
if (resp.length > 0) existing = resp[0]
}
if (existing) {
if (save.mode === 'upsert') {
const body = save.updateConverter ? await save.updateConverter.call(this, rec, save) : rec
try {
record = await recordUpdate(save.coll, existing.id, body)
method = 'updated'
} catch (err) {
console.error(err)
method = 'error'
}
} else {
method = 'skipped'
}
} else {
try {
record = await recordCreate(save.coll, rec)
method = 'created'
} catch (err) {
console.error(err)
method = 'error'
}
}
if (record && current.coll && current.query) {
const query = await current.query.call(this, { body: rec, record, opts: save })
const recs = await recordFind(current.coll, { query }, { noCache: true })
const rc = current.converter ? await current.converter.call(this, { body: rec, record, opts: save }) : rec
if (rc) {
if (recs.length > 0) {
const id = recs[0].id
await recordUpdate(current.coll, id, rc)
} else {
await recordCreate(current.coll, rc)
}
}
}
return method
}
/**
* Plugin factory
*
* @param {string} pkgName - NPM package name
* @returns {class}
*/
async function factory (pkgName) {
const me = this
/**
* BajoExtra class
*
* @class
*/
class BajoExtra extends this.app.pluginClass.base {
static alias = 'extra'
constructor () {
super(pkgName, me.app)
this.config = {
secret: 'hxKY8Eh63Op9js6ovU25qmq2DmCE9dIB',
fetch: {
agent: {
autoSelectFamilyAttemptTimeout: 1000,
autoSelectFamily: true
}
}
}
}
formatByte = (value, opts = {}) => {
opts.output = 'byte'
opts.base = 'binary'
opts.mantissa = opts.mantissa ?? opts.scale ?? 2
opts.spaceSeparated = opts.spaceSeparated ?? true
return numbro(value).format(opts)
}
formatFloat = (value, opts = {}) => {
opts.mantissa = opts.mantissa ?? opts.scale ?? 2
opts.thousandSeparated = opts.thousandSeparated ?? true
return numbro(value).format(opts)
}
formatInteger = (value, opts = {}) => {
opts.mantissa = 0
opts.thousandSeparated = opts.thousandSeparated ?? true
return numbro(value).format(opts)
}
formatPercentage = (value, opts = {}) => {
opts.output = 'percent'
opts.mantissa = opts.mantissa ?? opts.scale ?? 2
opts.spaceSeparated = opts.spaceSeparated ?? true
return numbro(value).format(opts)
}
// taken from: https://stackoverflow.com/a/41439945
countFileLines = async (file) => {
const { fs } = this.app.lib
return new Promise((resolve, reject) => {
let lineCount = 0
fs.createReadStream(file)
.on('data', (buffer) => {
let idx = -1
lineCount--
do {
idx = buffer.indexOf(10, idx + 1)
lineCount++
} while (idx !== -1)
})
.on('end', () => {
resolve(lineCount)
})
.on('error', reject)
})
}
download = async (url, opts = {}, extra = {}) => {
const { getPluginDataDir, importPkg, generateId } = this.app.bajo
const { fetch } = await importPkg('bajoExtra:undici')
const { fs } = this.app.lib
const { isFunction, merge } = this.app.lib._
if (typeof opts === 'string') extra = { dir: opts }
const increment = await importPkg('bajo:add-filename-increment')
if (!extra.dir) {
extra.dir = `${getPluginDataDir('bajoExtra')}/download`
fs.ensureDirSync(extra.dir)
}
if (!fs.existsSync(extra.dir)) throw this.error('dlDirNotExists%s', extra.dir)
if (extra.randomFileName) {
const ext = path.extname(url)
extra.fileName = `${generateId()}${ext}`
}
if (!extra.fileName) extra.fileName = path.basename(url)
const file = path.resolve(increment(`${extra.dir}/${extra.fileName}`, { fs: true }))
const writer = fs.createWriteStream(file)
const { headers, body, ok, status } = await fetch(url, opts, merge({}, extra, { rawResponse: true }))
if (!ok) {
fs.removeSync(file)
throw this.error('gettingStatus%s', status)
}
const total = headers['content-length'] ?? 0
const data = Readable.fromWeb(body)
let length = 0
data.on('data', chunk => {
length += chunk.length
if (isFunction(extra.progressFn)) extra.progressFn.call(this, length, total)
else if (extra.spin) {
extra.spinText = extra.spinText ?? 'downloading'
if (total === 0) extra.spin.setText(`${extra.spinText} %s`, this.formatByte(length))
else extra.spin.setText(`${extra.spinText} %s of %s (%s)`, this.formatByte(length), this.formatByte(total), this.formatPercentage(length / total))
}
})
data.pipe(writer)
return new Promise((resolve, reject) => {
writer.on('error', reject)
writer.on('finish', () => {
resolve(file)
})
})
}
fetchAndSave = async ({ url, bulk, save = {}, opts = {} } = {}) => {
const { startPlugin } = this.bajo
const { merge } = this.bajo.lib._
merge(bulk, { handler, save })
await startPlugin('dobo')
await this.fetchBulk(url, bulk, opts)
}
fetchBulk = async (url, bulk = {}, opts = {}) => {
const { isFunction } = this.bajo.lib._
opts.params = opts.params ?? {}
bulk.maxStep = bulk.maxStep ?? 0
if (!isFunction(bulk.handler)) throw this.error('handlerMustBeProvided')
if (isFunction(bulk.paramsIncFn)) {
this.print.info('bulkFetchStarting')
const spin = this.print.spinner({ showCounter: true }).start('fetchingStarts')
let step = 1
for (;;) {
this.print.info('batch%s%d', spin.getElapsed(), step)
const newOpts = await bulk.paramsIncFn.call(this, { url, bulk, opts })
if (newOpts) opts = newOpts
const length = await fetching.call(this, { url, bulk, opts, spin })
if (length === 0 || (bulk.maxStep > 0 && step >= bulk.maxStep)) {
this.print.info('allDone')
break
}
step++
}
} else {
const spin = this.print.spinner({ showCounter: true }).start('fetchingStarts')
await fetching.call(this, { url, bulk, opts, spin })
}
}
fetchUrl = async (url, opts = {}, extra = {}) => {
const { importPkg } = this.app.bajo
const { fetch, Agent } = await importPkg('bajoExtra:undici')
const { isSet } = this.app.lib.aneka
const { fs } = this.app.lib
const { isEmpty, has, isArray, isPlainObject, isString, cloneDeep, merge } = this.app.lib._
if (isPlainObject(url)) {
extra = cloneDeep(opts)
opts = cloneDeep(url)
url = opts.url
delete opts.url
}
if (opts.method) opts.method = opts.method.toUpperCase()
if (opts.auth) {
opts.headers.Authorization = `Basic ${Buffer.from(`${opts.auth.username}:${opts.auth.password}`).toString('base64')}`
delete opts.auth
}
const query = merge({}, opts.query, opts.params ?? {})
for (const q in query) {
if (!isSet(query[q])) delete query[q]
}
if (!isEmpty(query)) opts.query = query
delete opts.params
if (!has(extra, 'cacheBuster')) extra.cacheBuster = true
if (extra.cacheBuster) opts.query[extra.cacheBusterKey ?? '_'] = Date.now()
if (this.config.fetch.agent || extra.agent) {
opts.dispatcher = new Agent(extra.agent ?? this.config.fetch.agent)
}
if (opts.body && extra.formData) {
const formData = new FormData()
for (const key in opts.body) {
let fname
let val = opts.body[key]
if (!isSet(val)) continue
if (isString(val) && val.startsWith('file:///')) {
fname = path.basename(val)
val = new Blob([fs.readFileSync(val.slice(8))])
} else if (isPlainObject(val) || isArray(val)) val = JSON.stringify(val)
if (fname) formData.append(key, val, fname)
else formData.append(key, val)
}
opts.body = formData
}
if (opts.query) {
// todo: what if url already contain query string?
const query = new URLSearchParams(opts.query)
url += '?' + query
}
opts.headers = opts.headers ?? {}
if (this.config.fetch.userAgent) opts.headers['User-Agent'] = this.config.fetch.userAgent
const resp = await fetch(url, opts)
if (extra.rawResponse) return resp
return await resp.json()
}
gunzip = async (file, deleteOld) => {
await this.gzip(file, deleteOld, true)
}
gzip = async (file, deleteOld, expand) => {
const { fs } = this.app.lib
return new Promise((resolve, reject) => {
const newFile = expand ? file.slice(0, file.length - 3) : (file + '.gz')
const reader = fs.createReadStream(file)
const writer = fs.createWriteStream(newFile)
const method = expand ? createGunzip() : createGzip()
reader.pipe(method).pipe(writer)
writer.on('error', reject)
writer.on('finish', err => {
if (err) return reject(err)
if (deleteOld) fs.unlinkSync(file)
resolve()
})
})
}
hash = async (text, type = 'md5', options = {}) => {
const { importPkg } = this.app.bajo
const bcrypt = await importPkg('bajoExtra:bcrypt')
options.digest = options.digest ?? 'hex'
options.salt = options.hash ?? 10
if (typeof text !== 'string') text = JSON.stringify(text)
if (type === 'bcrypt') return await bcrypt.hash(text, options.salt)
if (type === 'short') {
type = 'shake256'
options.outputLength = 6
}
return crypto.createHash(type, options).update(text).digest(options.digest)
}
isBcrypt = (text) => {
// return /^\$2[ayb]\$.{56}$/.test(text)
return /^\$2[aby]?\$\d{1,2}\$[./A-Za-z0-9]{53}$/.test(text)
}
isMd5 = (text) => {
return /^[a-f0-9]{32}$/i.test(text)
}
encrypt = async (text, { type = 'short', subType = 'qr' } = {}) => {
const { importPkg } = this.app.bajo
const { ShortCrypt } = await importPkg('bajoExtra:short-crypt')
const short = (item) => {
const sc = new ShortCrypt(this.config.secret)
const method = subType === 'qr' ? 'encryptToQRCodeAlphanumeric' : 'encryptToURLComponent'
return sc[method](item)
}
switch (type) {
case 'short': return short(text)
}
throw this.error('invalid%s%s', this.t('encryption type'), type)
}
decrypt = async (cipher, { type = 'short', subType = 'qr' } = {}) => {
const { importPkg } = this.app.bajo
const { ShortCrypt } = await importPkg('bajoExtra:short-crypt')
const short = (item) => {
const sc = new ShortCrypt(this.config.secret)
const method = subType === 'qr' ? 'decryptToQRCodeAlphanumeric' : 'decryptToURLComponent'
return sc[method](item)
}
switch (type) {
case 'short': return short(cipher)
}
throw this.error('invalid%s%s', this.t('decryption type'), type)
}
randomRange = (min, max, alpha) => {
const num = Math.floor(Math.random() * (max - min + 1) + min)
if (!alpha) return num
return String.fromCharCode(96 + num)
}
}
return BajoExtra
}
export default factory