feat: Implement Resumable Generation Jobs with SSE Support

- Introduced GenerationJobManager to handle resumable LLM generation jobs independently of HTTP connections.
- Added support for subscribing to ongoing generation jobs via SSE, allowing clients to reconnect and receive updates without losing progress.
- Enhanced existing agent controllers and routes to integrate resumable functionality, including job creation, completion, and error handling.
- Updated client-side hooks to manage adaptive SSE streams, switching between standard and resumable modes based on user settings.
- Added UI components and settings for enabling/disabling resumable streams, improving user experience during unstable connections.
This commit is contained in:
Danny Avila 2025-12-03 21:48:04 -05:00
parent 5bfebc7c9d
commit 0e850a5d5f
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
17 changed files with 1212 additions and 37 deletions

View file

@ -1,5 +1,5 @@
const { nanoid } = require('nanoid');
const { sendEvent } = require('@librechat/api');
const { sendEvent, GenerationJobManager } = require('@librechat/api');
const { logger } = require('@librechat/data-schemas');
const { Tools, StepTypes, FileContext, ErrorTypes } = require('librechat-data-provider');
const {
@ -144,17 +144,38 @@ function checkIfLastAgent(last_agent_id, langgraph_node) {
return langgraph_node?.endsWith(last_agent_id);
}
/**
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} eventData - The event data to send
*/
function emitEvent(res, streamId, eventData) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
}
/**
* Get default handlers for stream events.
* @param {Object} options - The options object.
* @param {ServerResponse} options.res - The options object.
* @param {ContentAggregator} options.aggregateContent - The options object.
* @param {ServerResponse} options.res - The server response object.
* @param {ContentAggregator} options.aggregateContent - Content aggregator function.
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
* @param {string | null} [options.streamId] - The stream ID for resumable mode, or null for standard mode.
* @returns {Record<string, t.EventHandler>} The default handlers.
* @throws {Error} If the request is not found.
*/
function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedUsage }) {
function getDefaultHandlers({
res,
aggregateContent,
toolEndCallback,
collectedUsage,
streamId = null,
}) {
if (!res || !aggregateContent) {
throw new Error(
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
@ -173,16 +194,16 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else {
const agentName = metadata?.name ?? 'Agent';
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
const action = isToolCall ? 'performing a task...' : 'thinking...';
sendEvent(res, {
emitEvent(res, streamId, {
event: 'on_agent_update',
data: {
runId: metadata?.run_id,
@ -202,11 +223,11 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.delta.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -220,11 +241,11 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.result != null) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -238,9 +259,9 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -254,9 +275,9 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -266,15 +287,30 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
return handlers;
}
/**
* Helper to write attachment events either to res or to job emitter.
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} attachment - The attachment data
*/
function writeAttachment(res, streamId, attachment) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
} else {
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
}
}
/**
*
* @param {Object} params
* @param {ServerRequest} params.req
* @param {ServerResponse} params.res
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
* @param {string | null} [params.streamId] - The stream ID for resumable mode, or null for standard mode.
* @returns {ToolEndCallback} The tool end callback.
*/
function createToolEndCallback({ req, res, artifactPromises }) {
function createToolEndCallback({ req, res, artifactPromises, streamId = null }) {
/**
* @type {ToolEndCallback}
*/
@ -302,10 +338,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
if (!attachment) {
return null;
}
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing file citations:', error);
@ -314,8 +350,6 @@ function createToolEndCallback({ req, res, artifactPromises }) {
);
}
// TODO: a lot of duplicated code in createToolEndCallback
// we should refactor this to use a helper function in a follow-up PR
if (output.artifact[Tools.ui_resources]) {
artifactPromises.push(
(async () => {
@ -326,10 +360,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
};
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -348,10 +382,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
};
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -388,7 +422,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
});
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return fileMetadata;
}
@ -396,7 +430,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
writeAttachment(res, streamId, fileMetadata);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -435,7 +469,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
session_id: output.artifact.session_id,
});
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return fileMetadata;
}
@ -443,7 +477,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
writeAttachment(res, streamId, fileMetadata);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);