const UB = require('@unitybase/ub')
const App = UB.App
const Session = require('@unitybase/ub').Session
/* global ubq_messages */
// eslint-disable-next-line camelcase
let me = ubq_messages
const os = require('os')
const HOST_NAME = os.hostname() || 'unknown'
me.entity.addMethod('executeSchedulerTask')
me.entity.addMethod('addqueue')
me.entity.addMethod('success')
/**
* Mark queue task as successfully executed
* @method success
* @param {ubMethodParams} ctxt
* @param {Number} ctxt.mParams.ID
* @published
* @memberOf ubq_messages_ns.prototype
* @memberOfModule @unitybase/ubq
*/
me.success = function (ctxt) {
ctxt.dataStore.execSQL('update ubq_messages set completeDate = :completeDate: where ID = :ID:', {completeDate: new Date(), ID: ctxt.mParams.ID})
return true
}
/**
* Add item to queue.
*
* Used by server FTS mixin - do not remove
* @method addqueue
* @param {ubMethodParams} ctxt
* @param {String} ctxt.mParams.queueCode Queue code to add a item to
* @param {String} ctxt.mParams.msgCmd Command
* @param {String} ctxt.mParams.msgData Additional command data
* @param {Number} [ctxt.mParams.msgPriority=0] Priority
* @published
* @memberOf ubq_messages_ns.prototype
* @memberOfModule @unitybase/ubq
* @return {Boolean}
*/
me.addqueue = function (ctxt) {
console.debug('Call JS method: ubq_messages.addqueue')
let mParams = ctxt.mParams
let fMethod = 'insert'
let inst = UB.DataStore('ubq_messages')
let fexecParams = {}
fexecParams.ID = inst.generateID()
fexecParams.queueCode = mParams.queueCode
fexecParams.msgCmd = mParams.msgCmd
fexecParams.msgData = mParams.msgData
if (!mParams.msgPriority) {
fexecParams.msgPriority = 0
}
let runobj = {
entity: 'ubq_messages',
method: fMethod,
fieldList: ['*'],
execParams: fexecParams
}
inst.run(fMethod, runobj)
return true
}
/**
* Take a `.` separated string and return a function it points to (starting from global)
* Think about it as about safe eval
* @private
* @param {String} path
* @return {Function|undefined}
*/
function getFnFromNS (path) {
let root = global
if (typeof path !== 'string') {
return undefined
}
let parts = path.split('.')
for (let j = 0, subLn = parts.length; j < subLn; j++) {
let part = parts[j]
if (root[part]) {
root = root[part]
} else {
return undefined
}
}
return typeof root === 'function' ? root : undefined
}
/**
* REST endpoint for executing a scheduler task.
* Queue worker will sent the tasks in async mode to this endpoint according to a schedulers.
* Endpoint wait a POST requests from a local IP with JSON in body:
*
* {
* schedulerName: cfg.name, command: cfg.command, module: cfg.module,
* singleton: cfg.singleton !== false, logSuccessful: cfg.logSuccessful
* }
*
* `command` must be a function name (may including namespace), for example `UB.UBQ.sendQueueMail` or `ubs_message_edit.notifyAllRecipients`
* in case `command` not passed `module` must be a module what export default a function, for example module: '@unitybase/myModule/schedTask'
* and in schedTask.js `module exports = function() {...}`
*
* In case `singleton` parameter is missing or === false scheduler can run a multiple instances of the same task,
* otherwise - if previous task with the same name not finished yet current task will not be executed
*
* - If command executed success, record with resultError===0 will be written to `ubq_runstat` entity.
* - If command executed **with exception**, record with resultError===1 will be written to `ubq_runstat` entity,
* Exception text will be written written to `ubq_runstat.resultErrorMsg`.
*
* @method executeSchedulerTask
* @param {null} nullCtxt
* @param {THTTPRequest} req Name of a scheduler item
* @param {THTTPResponse} resp Command to execute
* @memberOf ubq_messages_ns.prototype
* @memberOfModule @unitybase/ubq
* @published
* @return {Boolean}
*/
me.executeSchedulerTask = function executeSchedulerTask (nullCtxt, req, resp) {
let logText, err
let statParams
if (App.localIPs.indexOf(Session.callerIP) === -1) {
throw new Error('SCHEDULER: remote execution is not allowed')
}
let task = JSON.parse(req.read())
let taskName = task.schedulerName || 'unknownTask'
let isSingleton = (task.singleton !== false)
if (isSingleton && (App.globalCacheGet(taskName) === '1')) {
console.warn('SCHEDULER: task %s is already running', taskName)
return false
}
if (isSingleton) {
App.globalCachePut(taskName, '1')
}
err = ''
try {
console.debug('SCHEDULER: got a task %j', task)
let startTime = new Date()
let entryPoint
if (task.command) {
entryPoint = getFnFromNS(task.command)
} else if (task.module) {
entryPoint = require(task.module)
}
if (!entryPoint) {
err = `SCHEDULER: invalid command (function ${task.command || task.module} not found)`
} else {
try {
logText = entryPoint()
App.dbCommit()
} catch (e) {
err = e.toString()
App.dbRollback()
}
}
let endTime = new Date()
let statInst = UB.DataStore('ubq_runstat')
if (task.logSuccessful || err !== '') {
statParams = {
appName: HOST_NAME,
schedulerName: taskName,
startTime: startTime,
endTime: endTime,
resultError: err === '' ? 0 : 1
}
if (err !== '') {
statParams.resultErrorMsg = err
}
if (logText) {
statParams.logText = logText
}
statInst.run('insert', {
execParams: statParams
})
}
} finally {
if (isSingleton) {
App.globalCachePut(taskName, '0')
}
}
resp.statusCode = 200
console.debug('SCHEDULER: end a task %j with result %j', task, statParams)
App.logout()
}