feat: Implement Resumable Generation Jobs with SSE Support

- Introduced GenerationJobManager to handle resumable LLM generation jobs independently of HTTP connections.
- Added support for subscribing to ongoing generation jobs via SSE, allowing clients to reconnect and receive updates without losing progress.
- Enhanced existing agent controllers and routes to integrate resumable functionality, including job creation, completion, and error handling.
- Updated client-side hooks to manage adaptive SSE streams, switching between standard and resumable modes based on user settings.
- Added UI components and settings for enabling/disabling resumable streams, improving user experience during unstable connections.
This commit is contained in:
Danny Avila 2025-12-03 21:48:04 -05:00
parent 3213f574c6
commit 6bb2fac0ec
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
17 changed files with 1212 additions and 37 deletions

View file

@ -1,5 +1,5 @@
const { nanoid } = require('nanoid');
const { sendEvent } = require('@librechat/api');
const { sendEvent, GenerationJobManager } = require('@librechat/api');
const { logger } = require('@librechat/data-schemas');
const { Tools, StepTypes, FileContext, ErrorTypes } = require('librechat-data-provider');
const {
@ -144,17 +144,38 @@ function checkIfLastAgent(last_agent_id, langgraph_node) {
return langgraph_node?.endsWith(last_agent_id);
}
/**
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} eventData - The event data to send
*/
function emitEvent(res, streamId, eventData) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
}
/**
* Get default handlers for stream events.
* @param {Object} options - The options object.
* @param {ServerResponse} options.res - The options object.
* @param {ContentAggregator} options.aggregateContent - The options object.
* @param {ServerResponse} options.res - The server response object.
* @param {ContentAggregator} options.aggregateContent - Content aggregator function.
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
* @param {string | null} [options.streamId] - The stream ID for resumable mode, or null for standard mode.
* @returns {Record<string, t.EventHandler>} The default handlers.
* @throws {Error} If the request is not found.
*/
function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedUsage }) {
function getDefaultHandlers({
res,
aggregateContent,
toolEndCallback,
collectedUsage,
streamId = null,
}) {
if (!res || !aggregateContent) {
throw new Error(
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
@ -173,16 +194,16 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else {
const agentName = metadata?.name ?? 'Agent';
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
const action = isToolCall ? 'performing a task...' : 'thinking...';
sendEvent(res, {
emitEvent(res, streamId, {
event: 'on_agent_update',
data: {
runId: metadata?.run_id,
@ -202,11 +223,11 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.delta.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -220,11 +241,11 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (data?.result != null) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -238,9 +259,9 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -254,9 +275,9 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
*/
handle: (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -266,15 +287,30 @@ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedU
return handlers;
}
/**
* Helper to write attachment events either to res or to job emitter.
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} attachment - The attachment data
*/
function writeAttachment(res, streamId, attachment) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
} else {
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
}
}
/**
*
* @param {Object} params
* @param {ServerRequest} params.req
* @param {ServerResponse} params.res
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
* @param {string | null} [params.streamId] - The stream ID for resumable mode, or null for standard mode.
* @returns {ToolEndCallback} The tool end callback.
*/
function createToolEndCallback({ req, res, artifactPromises }) {
function createToolEndCallback({ req, res, artifactPromises, streamId = null }) {
/**
* @type {ToolEndCallback}
*/
@ -302,10 +338,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
if (!attachment) {
return null;
}
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing file citations:', error);
@ -314,8 +350,6 @@ function createToolEndCallback({ req, res, artifactPromises }) {
);
}
// TODO: a lot of duplicated code in createToolEndCallback
// we should refactor this to use a helper function in a follow-up PR
if (output.artifact[Tools.ui_resources]) {
artifactPromises.push(
(async () => {
@ -326,10 +360,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
};
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -348,10 +382,10 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
};
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
return attachment;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -388,7 +422,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
});
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return fileMetadata;
}
@ -396,7 +430,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
writeAttachment(res, streamId, fileMetadata);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
@ -435,7 +469,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
conversationId: metadata.thread_id,
session_id: output.artifact.session_id,
});
if (!res.headersSent) {
if (!streamId && !res.headersSent) {
return fileMetadata;
}
@ -443,7 +477,7 @@ function createToolEndCallback({ req, res, artifactPromises }) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
writeAttachment(res, streamId, fileMetadata);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);

View file

@ -2,6 +2,7 @@ const { logger } = require('@librechat/data-schemas');
const { Constants } = require('librechat-data-provider');
const {
sendEvent,
GenerationJobManager,
sanitizeFileForTransmit,
sanitizeMessageForTransmit,
} = require('@librechat/api');
@ -31,7 +32,232 @@ function createCloseHandler(abortController) {
};
}
/**
* Resumable Agent Controller - Generation runs independently of HTTP connection.
* Returns streamId immediately, client subscribes separately via SSE.
*/
const ResumableAgentController = async (req, res, next, initializeClient, addTitle) => {
const {
text,
isRegenerate,
endpointOption,
conversationId: reqConversationId,
isContinued = false,
editedContent = null,
parentMessageId = null,
overrideParentMessageId = null,
responseMessageId: editedResponseMessageId = null,
} = req.body;
const userId = req.user.id;
const streamId =
reqConversationId || `stream_${Date.now()}_${Math.random().toString(36).slice(2)}`;
let client = null;
try {
const prelimAbortController = new AbortController();
res.on('close', () => {
if (!prelimAbortController.signal.aborted) {
prelimAbortController.abort();
}
});
const job = GenerationJobManager.createJob(streamId, userId, reqConversationId);
req._resumableStreamId = streamId;
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
res,
endpointOption,
signal: prelimAbortController.signal,
});
if (prelimAbortController.signal.aborted) {
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
return res.status(400).json({ error: 'Request aborted during initialization' });
}
client = result.client;
res.json({ streamId, status: 'started' });
let conversationId = reqConversationId;
let userMessage;
const getReqData = (data = {}) => {
if (data.userMessage) {
userMessage = data.userMessage;
}
if (!conversationId && data.conversationId) {
conversationId = data.conversationId;
}
};
// Start background generation - wait for subscriber with timeout fallback
const startGeneration = async () => {
try {
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 5000))]);
} catch (waitError) {
logger.warn(
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
);
}
try {
const onStart = (userMsg, _respMsgId, _isNewConvo) => {
userMessage = userMsg;
GenerationJobManager.emitChunk(streamId, {
created: true,
message: userMessage,
streamId,
});
};
const messageOptions = {
user: userId,
onStart,
getReqData,
isContinued,
isRegenerate,
editedContent,
conversationId,
parentMessageId,
abortController: job.abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
responseMessageId: editedResponseMessageId,
progressOptions: {
res: {
write: () => true,
end: () => {},
headersSent: false,
writableEnded: false,
},
},
};
const response = await client.sendMessage(text, messageOptions);
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
const databasePromise = response.databasePromise;
delete response.databasePromise;
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (req.body.files && client.options?.attachments) {
userMessage.files = [];
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
for (const attachment of client.options.attachments) {
if (messageFiles.has(attachment.file_id)) {
userMessage.files.push(sanitizeFileForTransmit(attachment));
}
}
delete userMessage.image_urls;
}
if (!job.abortController.signal.aborted) {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response },
};
GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId);
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
req,
{ ...response, user: userId },
{ context: 'api/server/controllers/agents/request.js - resumable response end' },
);
}
} else {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response, error: true },
error: { message: 'Request was aborted' },
};
GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId, 'Request aborted');
}
if (!client.skipSaveUserMessage && userMessage) {
await saveMessage(req, userMessage, {
context: 'api/server/controllers/agents/request.js - resumable user message',
});
}
const newConvo = !reqConversationId;
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) {
addTitle(req, {
text,
response: { ...response },
client,
})
.catch((err) => {
logger.error('[ResumableAgentController] Error in title generation', err);
})
.finally(() => {
if (client) {
disposeClient(client);
}
});
} else {
if (client) {
disposeClient(client);
}
}
} catch (error) {
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
GenerationJobManager.completeJob(streamId, error.message);
if (client) {
disposeClient(client);
}
}
};
// Start generation and handle any unhandled errors
startGeneration().catch((err) => {
logger.error(
`[ResumableAgentController] Unhandled error in background generation: ${err.message}`,
);
GenerationJobManager.completeJob(streamId, err.message);
});
} catch (error) {
logger.error('[ResumableAgentController] Initialization error:', error);
if (!res.headersSent) {
res.status(500).json({ error: error.message || 'Failed to start generation' });
}
GenerationJobManager.completeJob(streamId, error.message);
if (client) {
disposeClient(client);
}
}
};
const AgentController = async (req, res, next, initializeClient, addTitle) => {
const isResumable = req.query.resumable === 'true';
if (isResumable) {
return ResumableAgentController(req, res, next, initializeClient, addTitle);
}
let {
text,
isRegenerate,