wekan/packages/wekan-cfs-power-queue/power-queue.js
2021-04-29 13:26:49 +03:00

727 lines
25 KiB
JavaScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Rig weak dependencies
if (typeof MicroQueue === 'undefined' && Package['micro-queue']) {
MicroQueue = Package['micro-queue'].MicroQueue;
}
if (typeof ReactiveList === 'undefined' && Package['reactive-list']) {
ReactiveList = Package['reactive-list'].ReactiveList;
}
// Rig weak dependencies in +0.9.1
if (typeof MicroQueue === 'undefined' && Package['wekan-cfs-micro-queue']) {
MicroQueue = Package['wekan-cfs-micro-queue'].MicroQueue;
}
if (typeof ReactiveList === 'undefined' && Package['wekan-cfs-reactive-list']) {
ReactiveList = Package['wekan-cfs-reactive-list'].ReactiveList;
}
/**
* Creates an instance of a power queue // Testing inline comment
* [Check out demo](http://power-queue-test.meteor.com/)
*
* @constructor
* @self powerqueue
* @param {object} [options] Settings
* @param {boolean} [options.filo=false] Make it a first in last out queue
* @param {boolean} [options.isPaused=false] Set queue paused
* @param {boolean} [options.autostart=true] May adding a task start the queue
* @param {string} [options.name="Queue"] Name of the queue
* @param {number} [options.maxProcessing=1] Limit of simultanous running tasks
* @param {number} [options.maxFailures = 5] Limit retries of failed tasks, if 0 or below we allow infinite failures
* @param {number} [options.jumpOnFailure = true] Jump to next task and retry failed task later
* @param {boolean} [options.debug=false] Log verbose messages to the console
* @param {boolean} [options.reactive=true] Set whether or not this queue should be reactive
* @param {boolean} [options.onAutostart] Callback for the queue autostart event
* @param {boolean} [options.onPaused] Callback for the queue paused event
* @param {boolean} [options.onReleased] Callback for the queue release event
* @param {boolean} [options.onEnded] Callback for the queue end event
* @param {[SpinalQueue](spinal-queue.spec.md)} [options.spinalQueue] Set spinal queue uses pr. default `MicroQueue` or `ReactiveList` if added to the project
*/
PowerQueue = function(options) {
var self = this;
var test = 5;
self.reactive = (options && options.reactive === false) ? false : true;
// Allow user to use another micro-queue #3
// We try setting the ActiveQueue to MicroQueue if installed in the app
var ActiveQueue = (typeof MicroQueue !== 'undefined') && MicroQueue || undefined;
// If ReactiveList is added to the project we use this over MicroQueue
ActiveQueue = (typeof ReactiveList !== 'undefined') && ReactiveList || ActiveQueue;
// We allow user to overrule and set a custom spinal-queue spec complient queue
if (options && typeof options.spinalQueue !== 'undefined') {
ActiveQueue = options.spinalQueue;
}
if (typeof ActiveQueue === 'undefined') {
console.log('Error: You need to add a spinal queue to the project');
console.log('Please add "micro-queue", "reactive-list" to the project');
throw new Error('Please add "micro-queue", "reactive-list" or other spinalQueue compatible packages');
}
// Default is fifo lilo
self.invocations = new ActiveQueue({
//
sort: (options && (options.filo || options.lifo)),
reactive: self.reactive
});
//var self.invocations = new ReactiveList(queueOrder);
// List of current tasks being processed
self._processList = new ActiveQueue({
reactive: self.reactive
}); //ReactiveList();
// Max number of simultanious tasks being processed
self._maxProcessing = new ReactiveProperty(options && options.maxProcessing || 1, self.reactive);
// Reactive number of tasks being processed
self._isProcessing = new ReactiveProperty(0, self.reactive);
// Boolean indicating if queue is paused or not
self._paused = new ReactiveProperty((options && options.isPaused || false), self.reactive);
// Boolean indicator for queue status active / running (can still be paused)
self._running = new ReactiveProperty(false, self.reactive);
// Counter for errors, errors are triggered if maxFailures is exeeded
self._errors = new ReactiveProperty(0, self.reactive);
// Counter for task failures, contains error count
self._failures = new ReactiveProperty(0, self.reactive);
// On failure jump to new task - if false the current task is rerun until error
self._jumpOnFailure = (options && options.jumpOnFailure === false) ? false : true;
// Count of all added tasks
self._maxLength = new ReactiveProperty(0, self.reactive);
// Boolean indicate whether or not a "add" task is allowed to start the queue
self._autostart = new ReactiveProperty( ((options && options.autostart === false) ? false : true), self.reactive);
// Limit times a task is allowed to fail and be rerun later before triggering an error
self._maxFailures = new ReactiveProperty( (options && options.maxFailures || 5), self.reactive);
// Name / title of this queue - Not used - should deprecate
self.title = options && options.name || 'Queue';
// debug - will print error / failures passed to next
self.debug = !!(options && options.debug);
/** @method PowerQueue.total
* @reactive
* @returns {number} The total number of tasks added to this queue
*/
self.total = self._maxLength.get;
/** @method PowerQueue.isPaused
* @reactive
* @returns {boolean} Status of the paused state of the queue
*/
self.isPaused = self._paused.get;
/** @method PowerQueue.processing
* @reactive
* @returns {number} Number of tasks currently being processed
*/
self.processing = self._isProcessing.get;
/** @method PowerQueue.errors
* @reactive
* @returns {number} The total number of errors
* Errors are triggered when [maxFailures](PowerQueue.maxFailures) are exeeded
*/
self.errors = self._errors.get;
/** @method PowerQueue.failures
* @reactive
* @returns {number} The total number of failed tasks
*/
self.failures = self._failures.get;
/** @method PowerQueue.isRunning
* @reactive
* @returns {boolean} True if the queue is running
* > NOTE: The task can be paused but marked as running
*/
self.isRunning = self._running.get;
/** @method PowerQueue.maxProcessing Get setter for maxProcessing
* @param {number} [max] If not used this function works as a getter
* @reactive
* @returns {number} Maximum number of simultaneous processing tasks
*
* Example:
* ```js
* foo.maxProcessing(); // Works as a getter and returns the current value
* foo.maxProcessing(20); // This sets the value to 20
* ```
*/
self.maxProcessing = self._maxProcessing.getset;
self._maxProcessing.onChange = function() {
// The user can change the max allowed processing tasks up or down here...
// Update the throttle up
self.updateThrottleUp();
// Update the throttle down
self.updateThrottleDown();
};
/** @method PowerQueue.autostart Get setter for autostart
* @param {boolean} [autorun] If not used this function works as a getter
* @reactive
* @returns {boolean} If adding a task may trigger the queue to start
*
* Example:
* ```js
* foo.autostart(); // Works as a getter and returns the current value
* foo.autostart(true); // This sets the value to true
* ```
*/
self.autostart = self._autostart.getset;
/** @method PowerQueue.maxFailures Get setter for maxFailures
* @param {number} [max] If not used this function works as a getter
* @reactive
* @returns {number} The maximum for failures pr. task before triggering an error
*
* Example:
* ```js
* foo.maxFailures(); // Works as a getter and returns the current value
* foo.maxFailures(10); // This sets the value to 10
* ```
*/
self.maxFailures = self._maxFailures.getset;
/** @callback PowerQueue.onPaused
* Is called when queue is ended
*/
self.onPaused = options && options.onPaused || function() {
self.debug && console.log(self.title + ' ENDED');
};
/** @callback PowerQueue.onEnded
* Is called when queue is ended
*/
self.onEnded = options && options.onEnded || function() {
self.debug && console.log(self.title + ' ENDED');
};
/** @callback PowerQueue.onRelease
* Is called when queue is released
*/
self.onRelease = options && options.onRelease || function() {
self.debug && console.log(self.title + ' RELEASED');
};
/** @callback PowerQueue.onAutostart
* Is called when queue is auto started
*/
self.onAutostart = options && options.onAutostart || function() {
self.debug && console.log(self.title + ' Autostart');
};
};
/** @method PowerQueue.prototype.processList
* @reactive
* @returns {array} List of tasks currently being processed
*/
PowerQueue.prototype.processingList = function() {
var self = this;
return self._processList.fetch();
};
/** @method PowerQueue.prototype.isHalted
* @reactive
* @returns {boolean} True if the queue is not running or paused
*/
PowerQueue.prototype.isHalted = function() {
var self = this;
return (!self._running.get() || self._paused.get());
};
/** @method PowerQueue.prototype.length
* @reactive
* @returns {number} Number of tasks left in queue to be processed
*/
PowerQueue.prototype.length = function() {
var self = this;
return self.invocations.length();
};
/** @method PowerQueue.prototype.progress
* @reactive
* @returns {number} 0 .. 100 % Indicates the status of the queue
*/
PowerQueue.prototype.progress = function() {
var self = this;
var progress = self._maxLength.get() - self.invocations.length() - self._isProcessing.get();
if (self._maxLength.value > 0) {
return Math.round(progress / self._maxLength.value * 100);
}
return 0;
};
/** @method PowerQueue.prototype.usage
* @reactive
* @returns {number} 0 .. 100 % Indicates resource usage of the queue
*/
PowerQueue.prototype.usage = function() {
var self = this;
return Math.round(self._isProcessing.get() / self._maxProcessing.get() * 100);
};
/** @method PowerQueue.prototype.reset Reset the queue
* Calling this will:
* * stop the queue
* * paused to false
* * Discart all queue data
*
* > NOTE: At the moment if the queue has processing tasks they can change
* > the `errors` and `failures` counters. This could change in the future or
* > be prevented by creating a whole new instance of the `PowerQueue`
*/
PowerQueue.prototype.reset = function() {
var self = this;
self.debug && console.log(self.title + ' RESET');
self._running.set(false);
self._paused.set(false);
self.invocations.reset();
self._processList.reset();
// // Loop through the processing tasks and reset these
// self._processList.forEach(function(data) {
// if (data.queue instanceof PowerQueue) {
// data.queue.reset();
// }
// }, true);
self._maxLength.set(0);
self._failures.set(0);
self._errors.set(0);
};
/** @method PowerQueue._autoStartTasks
* @private
*
* This method defines the autostart algorithm that allows add task to trigger
* a start of the queue if queue is not paused.
*/
PowerQueue.prototype._autoStartTasks = function() {
var self = this;
// We dont start anything by ourselfs if queue is paused
if (!self._paused.value) {
// Queue is not running and we are set to autostart so we start the queue
if (!self._running.value && self._autostart.value) {
// Trigger callback / event
self.onAutostart();
// Set queue as running
self._running.set(true);
}
// Make sure that we use all available resources
if (self._running.value) {
// Call next to start up the queue
self.next(null);
}
}
};
/** @method PowerQueue.prototype.add
* @param {any} data The task to be handled
* @param {number} [failures] Used internally to Pass on number of failures.
*/
PowerQueue.prototype.add = function(data, failures, id) {
var self = this;
// Assign new id to task
var assignNewId = self._jumpOnFailure || typeof id === 'undefined';
// Set the task id
var taskId = (assignNewId) ? self._maxLength.value + 1 : id;
// self.invocations.add({ _id: currentId, data: data, failures: failures || 0 }, reversed);
self.invocations.insert(taskId, { _id: taskId, data: data, failures: failures || 0 });
// If we assigned new id then increase length
if (assignNewId) self._maxLength.inc();
self._autoStartTasks();
};
/** @method PowerQueue.prototype.updateThrottleUp
* @private
*
* Calling this method will update the throttle on the queue adding tasks.
*
* > Note: Currently we only support the PowerQueue - but we could support
* > a more general interface for pauseable tasks or other usecases.
*/
PowerQueue.prototype.updateThrottleUp = function() {
var self = this;
// How many additional tasks can we handle?
var availableSlots = self._maxProcessing.value - self._isProcessing.value;
// If we can handle more, we have more, we're running, and we're not paused
if (!self._paused.value && self._running.value && availableSlots > 0 && self.invocations._length > 0) {
// Increase counter of current number of tasks being processed
self._isProcessing.inc();
// Run task
self.runTask(self.invocations.getFirstItem());
// Repeat recursively; this is better than a for loop to avoid blocking the UI
self.updateThrottleUp();
}
};
/** @method PowerQueue.prototype.updateThrottleDown
* @private
*
* Calling this method will update the throttle on the queue pause tasks.
*
* > Note: Currently we only support the PowerQueue - but we could support
* > a more general interface for pauseable tasks or other usecases.
*/
PowerQueue.prototype.updateThrottleDown = function() {
var self = this;
// Calculate the differece between acutuall processing tasks and target
var diff = self._isProcessing.value - self._maxProcessing.value;
// If the diff is more than 0 then we have many tasks processing.
if (diff > 0) {
// We pause the latest added tasks
self._processList.forEachReverse(function(data) {
if (diff > 0 && data.queue instanceof PowerQueue) {
diff--;
// We dont mind calling pause on multiple times on each task
// theres a simple check going on preventing any duplicate actions
data.queue.pause();
}
}, true);
}
};
/** @method PowerQueue.prototype.next
* @param {string} [err] Error message if task failed
* > * Can pass in `null` to start the queue
* > * Passing in a string to `next` will trigger a failure
* > * Passing nothing will simply let the next task run
* `next` is handed into the [taskHandler](PowerQueue.taskHandler) as a
* callback to mark an error or end of current task
*/
PowerQueue.prototype.next = function(err) {
var self = this;
// Primary concern is to throttle up because we are either:
// 1. Starting the queue
// 2. Starting next task
//
// This function does not shut down running tasks
self.updateThrottleUp();
// We are running, no tasks are being processed even we just updated the
// throttle up and we got no errors.
// 1. We are paused and releasing tasks
// 2. We are done
if (self._running.value && self._isProcessing.value === 0 && err !== null) {
// We have no tasks processing so this queue is now releasing resources
// this could be that the queue is paused or stopped, in that case the
// self.invocations._length would be > 0
// If on the other hand the self.invocations._length is 0 then we have no more
// tasks in the queue so the queue has ended
self.onRelease(self.invocations._length);
if (!self.invocations._length) { // !self._paused.value &&
// Check if queue is done working
// Stop the queue
self._running.set(false);
// self.invocations.reset(); // This should be implicit
self.onEnded();
}
}
};
/** @callback done
* @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
*
* Explaination of `feedback`
* * `Meteor.Error` This means that the task failed in a controlled manner and is allowed to rerun
* * `Error` This will throw the passed error - as its an unitended error
* * `null` The task is not done yet, rerun later
* * `String` The task can perform certain commands on the queue
* * "pause" - pause the queue
* * "stop" - stop the queue
* * "reset" - reset the queue
* * "cancel" - cancel the queue
*
*/
/** @method PowerQueue.prototype.runTaskDone
* @private
* @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
* @param {object} invocation
*
* > Note: `feedback` is explained in [Done callback](#done)
*
*/
// Rig the callback function
PowerQueue.prototype.runTaskDone = function(feedback, invocation) {
var self = this;
// If the task handler throws an error then add it to the queue again
// we allow this for a max of self._maxFailures
// If the error is null then we add the task silently back into the
// microQueue in reverse... This could be due to pause or throttling
if (feedback instanceof Meteor.Error) {
// We only count failures if maxFailures are above 0
if (self._maxFailures.value > 0) invocation.failures++;
self._failures.inc();
// If the user has set the debug flag we print out failures/errors
self.debug && console.error('Error: "' + self.title + '" ' + feedback.message + ', ' + feedback.stack);
if (invocation.failures < self._maxFailures.value) {
// Add the task again with the increased failures
self.add(invocation.data, invocation.failures, invocation._id);
} else {
self._errors.inc();
self.errorHandler(invocation.data, self.add, invocation.failures);
}
// If a error is thrown we assume its not intended
} else if (feedback instanceof Error) throw feedback;
if (feedback)
// We use null to throttle pauseable tasks
if (feedback === null) {
// We add this task into the queue, no questions asked
self.invocations.insert(invocation._id, { data: invocation.data, failures: invocation.failures, _id: invocation._id });
}
// If the user returns a string we got a command
if (feedback === ''+feedback) {
var command = {
'pause': function() { self.pause(); },
'stop': function() { self.stop(); },
'reset': function() { self.reset(); },
'cancel': function() { self.cancel(); },
};
if (typeof command[feedback] === 'function') {
// Run the command on this queue
command[feedback]();
} else {
// We dont recognize this command, throw an error
throw new Error('Unknown queue command "' + feedback + '"');
}
}
// Decrease the number of tasks being processed
// make sure we dont go below 0
if (self._isProcessing.value > 0) self._isProcessing.dec();
// Task has ended we remove the task from the process list
self._processList.remove(invocation._id);
invocation.data = null;
invocation.failures = null;
invocation._id = null;
invocation = null;
delete invocation;
// Next task
Meteor.setTimeout(function() {
self.next();
}, 0);
};
/** @method PowerQueue.prototype.runTask
* @private // This is not part of the open api
* @param {object} invocation The object stored in the micro-queue
*/
PowerQueue.prototype.runTask = function(invocation) {
var self = this;
// We start the fitting task handler
// Currently we only support the PowerQueue but we could have a more general
// interface for tasks that allow throttling
try {
if (invocation.data instanceof PowerQueue) {
// Insert PowerQueue into process list
self._processList.insert(invocation._id, { id: invocation._id, queue: invocation.data });
// Handle task
self.queueTaskHandler(invocation.data, function subQueueCallbackDone(feedback) {
self.runTaskDone(feedback, invocation);
}, invocation.failures);
} else {
// Insert task into process list
self._processList.insert(invocation._id, invocation.data);
// Handle task
self.taskHandler(invocation.data, function taskCallbackDone(feedback) {
self.runTaskDone(feedback, invocation);
}, invocation.failures);
}
} catch(err) {
throw new Error('Error while running taskHandler for queue, Error: ' + err.message);
}
};
/** @method PowerQueue.prototype.queueTaskHandler
* This method handles tasks that are sub queues
*/
PowerQueue.prototype.queueTaskHandler = function(subQueue, next, failures) {
var self = this;
// Monitor sub queue task releases
subQueue.onRelease = function(remaining) {
// Ok, we were paused - this could be throttling so we respect this
// So when the queue is halted we add it back into the main queue
if (remaining > 0) {
// We get out of the queue but dont repport error and add to run later
next(null);
} else {
// Queue has ended
// We simply trigger next task when the sub queue is complete
next();
// When running subqueues it doesnt make sense to track failures and retry
// the sub queue - this is sub queue domain
}
};
// Start the queue
subQueue.run();
};
/** @callback PowerQueue.prototype.taskHandler
* @param {any} data This can be data or functions
* @param {function} next Function `next` call this to end task
* @param {number} failures Number of failures on this task
*
* Default task handler expects functions as data:
* ```js
* self.taskHandler = function(data, next, failures) {
* // This default task handler expects invocation to be a function to run
* if (typeof data !== 'function') {
* throw new Error('Default task handler expects a function');
* }
* try {
* // Have the function call next
* data(next, failures);
* } catch(err) {
* // Throw to fail this task
* next(err);
* }
* };
* ```
*/
// Can be overwrittin by the user
PowerQueue.prototype.taskHandler = function(data, next, failures) {
var self = this;
// This default task handler expects invocation to be a function to run
if (typeof data !== 'function') {
throw new Error('Default task handler expects a function');
}
try {
// Have the function call next
data(next, failures);
} catch(err) {
// Throw to fail this task
next(err);
}
};
/** @callback PowerQueue.prototype.errorHandler
* @param {any} data This can be data or functions
* @param {function} addTask Use this function to insert the data into the queue again
* @param {number} failures Number of failures on this task
*
* The default callback:
* ```js
* var foo = new PowerQueue();
*
* // Overwrite the default action
* foo.errorHandler = function(data, addTask, failures) {
* // This could be overwritten the data contains the task data and addTask
* // is a helper for adding the task to the queue
* // try again: addTask(data);
* // console.log('Terminate at ' + failures + ' failures');
* };
* ```
*/
PowerQueue.prototype.errorHandler = function(data, addTask, failures) {
var self = this;
// This could be overwritten the data contains the task data and addTask
// is a helper for adding the task to the queue
// try again: addTask(data);
self.debug && console.log('Terminate at ' + failures + ' failures');
};
/** @method PowerQueue.prototype.pause Pause the queue
* @todo We should have it pause all processing tasks
*/
PowerQueue.prototype.pause = function() {
var self = this;
if (!self._paused.value) {
self._paused.set(true);
// Loop through the processing tasks and pause these
self._processList.forEach(function(data) {
if (data.queue instanceof PowerQueue) {
// Pause the sub queue
data.queue.pause();
}
}, true);
// Trigger callback
self.onPaused();
}
};
/** @method PowerQueue.prototype.resume Start a paused queue
* @todo We should have it resume all processing tasks
*
* > This will not start a stopped queue
*/
PowerQueue.prototype.resume = function() {
var self = this;
self.run();
};
/** @method PowerQueue.prototype.run Starts the queue
* > Using this command will resume a paused queue and will
* > start a stopped queue.
*/
PowerQueue.prototype.run = function() {
var self = this;
//not paused and already running or queue empty or paused subqueues
if (!self._paused.value && self._running.value || !self.invocations._length) {
return;
}
self._paused.set(false);
self._running.set(true);
self.next(null);
};
/** @method PowerQueue.prototype.stop Stops the queue
*/
PowerQueue.prototype.stop = function() {
var self = this;
self._running.set(false);
};
/** @method PowerQueue.prototype.cancel Cancel the queue
*/
PowerQueue.prototype.cancel = function() {
var self = this;
self.reset();
};