LibreChat/api/server/utils/jobScheduler.js

100 lines
3 KiB
JavaScript
Raw Normal View History

📥 feat: Import Conversations from LibreChat, ChatGPT, Chatbot UI (#2355) * Basic implementation of ChatGPT conversation import * remove debug code * Handle citations * Fix updatedAt in import * update default model * Use job scheduler to handle import requests * import job status endpoint * Add wrapper around Agenda * Rate limits for import endpoint * rename import api path * Batch save import to mongo * Improve naming * Add documenting comments * Test for importers * Change button for importing conversations * Frontend changes * Import job status endpoint * Import endpoint response * Add translations to new phrases * Fix conversations refreshing * cleanup unused functions * set timeout for import job status polling * Add documentation * get extra spaces back * Improve error message * Fix translation files after merge * fix translation files 2 * Add zh translation for import functionality * Sync mailisearch index after import * chore: add dummy uri for jest tests, as MONGO_URI should only be real for E2E tests * docs: fix links * docs: fix conversationsImport section * fix: user role issue for librechat imports * refactor: import conversations from json - organize imports - add additional jsdocs - use multer with diskStorage to avoid loading file into memory outside of job - use filepath instead of loading data string for imports - replace console logs and some logger.info() with logger.debug - only use multer for import route * fix: undefined metadata edge case and replace ChatGtp -> ChatGpt * Refactor importChatGptConvo function to handle undefined metadata edge case and replace ChatGtp with ChatGpt * fix: chatgpt importer * feat: maintain tree relationship for librechat messages * chore: use enum * refactor: saveMessage to use single object arg, replace console logs, add userId to log message * chore: additional comment * chore: multer edge case * feat: first pass, maintain tree relationship * chore: organize * chore: remove log * ci: add heirarchy test for chatgpt * ci: test maintaining of heirarchy for librechat * wip: allow non-text content type messages * refactor: import content part object json string * refactor: more content types to format * chore: consolidate messageText formatting * docs: update on changes, bump data-provider/config versions, update readme * refactor(indexSync): singleton pattern for MeiliSearchClient * refactor: debug log after batch is done * chore: add back indexSync error handling --------- Co-authored-by: jakubmieszczak <jakub.mieszczak@zendesk.com> Co-authored-by: Danny Avila <danny@librechat.ai>
2024-05-02 08:48:26 +02:00
const Agenda = require('agenda');
const { logger } = require('~/config');
const mongodb = require('mongodb');
/**
* Class for scheduling and running jobs.
* The workflow is as follows: start the job scheduler, define a job, and then schedule the job using defined job name.
*/
class JobScheduler {
constructor() {
this.agenda = new Agenda({ db: { address: process.env.MONGO_URI } });
}
/**
* Starts the job scheduler.
*/
async start() {
try {
logger.info('Starting Agenda...');
await this.agenda.start();
logger.info('Agenda successfully started and connected to MongoDB.');
} catch (error) {
logger.error('Failed to start Agenda:', error);
}
}
/**
* Schedules a job to start immediately.
* @param {string} jobName - The name of the job to schedule.
* @param {string} filepath - The filepath to pass to the job.
* @param {string} userId - The ID of the user requesting the job.
* @returns {Promise<{ id: string }>} - A promise that resolves with the ID of the scheduled job.
* @throws {Error} - If the job fails to schedule.
*/
async now(jobName, filepath, userId) {
try {
const job = await this.agenda.now(jobName, { filepath, requestUserId: userId });
logger.debug(`Job '${job.attrs.name}' scheduled successfully.`);
return { id: job.attrs._id.toString() };
} catch (error) {
throw new Error(`Failed to schedule job '${jobName}': ${error}`);
}
}
/**
* Gets the status of a job.
* @param {string} jobId - The ID of the job to get the status of.
* @returns {Promise<{ id: string, userId: string, name: string, failReason: string, status: string } | null>} - A promise that resolves with the job status or null if the job is not found.
* @throws {Error} - If multiple jobs are found.
*/
async getJobStatus(jobId) {
const job = await this.agenda.jobs({ _id: new mongodb.ObjectId(jobId) });
if (!job || job.length === 0) {
return null;
}
if (job.length > 1) {
// This should never happen
throw new Error('Multiple jobs found.');
}
const jobDetails = {
id: job[0]._id,
userId: job[0].attrs.data.requestUserId,
name: job[0].attrs.name,
failReason: job[0].attrs.failReason,
status: !job[0].attrs.lastRunAt
? 'scheduled'
: job[0].attrs.failedAt
? 'failed'
: job[0].attrs.lastFinishedAt
? 'completed'
: 'running',
};
return jobDetails;
}
/**
* Defines a new job.
* @param {string} name - The name of the job.
* @param {Function} jobFunction - The function to run when the job is executed.
*/
define(name, jobFunction) {
this.agenda.define(name, async (job, done) => {
try {
await jobFunction(job, done);
} catch (error) {
logger.error(`Failed to run job '${name}': ${error}`);
done(error);
}
});
}
}
const jobScheduler = new JobScheduler();
jobScheduler.start();
module.exports = jobScheduler;