LibreChat/api/server/controllers/agents/request.js
Danny Avila 696961df6c
chore: Adjust timeout for subscriber readiness in ResumableAgentController
- Reduced the timeout duration from 5000ms to 2500ms in the startGeneration function to improve responsiveness when waiting for subscriber readiness. This change aims to enhance the efficiency of the agent's background generation process.
2025-12-15 18:48:57 -05:00

656 lines
21 KiB
JavaScript

const { logger } = require('@librechat/data-schemas');
const { Constants } = require('librechat-data-provider');
const {
sendEvent,
GenerationJobManager,
sanitizeFileForTransmit,
sanitizeMessageForTransmit,
} = require('@librechat/api');
const {
handleAbortError,
createAbortController,
cleanupAbortController,
} = require('~/server/middleware');
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
const { saveMessage } = require('~/models');
function createCloseHandler(abortController) {
return function (manual) {
if (!manual) {
logger.debug('[AgentController] Request closed');
}
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[AgentController] Request aborted on close');
};
}
/**
* Resumable Agent Controller - Generation runs independently of HTTP connection.
* Returns streamId immediately, client subscribes separately via SSE.
*/
const ResumableAgentController = async (req, res, next, initializeClient, addTitle) => {
const {
text,
isRegenerate,
endpointOption,
conversationId: reqConversationId,
isContinued = false,
editedContent = null,
parentMessageId = null,
overrideParentMessageId = null,
responseMessageId: editedResponseMessageId = null,
} = req.body;
const userId = req.user.id;
const streamId =
reqConversationId || `stream_${Date.now()}_${Math.random().toString(36).slice(2)}`;
let client = null;
try {
const prelimAbortController = new AbortController();
res.on('close', () => {
if (!prelimAbortController.signal.aborted) {
prelimAbortController.abort();
}
});
const job = GenerationJobManager.createJob(streamId, userId, reqConversationId);
req._resumableStreamId = streamId;
// Track if partial response was already saved to avoid duplicates
let partialResponseSaved = false;
/**
* Listen for all subscribers leaving to save partial response.
* This ensures the response is saved to DB even if all clients disconnect
* while generation continues.
*
* Note: The messageId used here falls back to `${userMessage.messageId}_` if the
* actual response messageId isn't available yet. The final response save will
* overwrite this with the complete response using the same messageId pattern.
*/
job.emitter.on('allSubscribersLeft', async (aggregatedContent) => {
if (partialResponseSaved || !aggregatedContent || aggregatedContent.length === 0) {
return;
}
const resumeState = GenerationJobManager.getResumeState(streamId);
if (!resumeState?.userMessage) {
logger.debug('[ResumableAgentController] No user message to save partial response for');
return;
}
partialResponseSaved = true;
const responseConversationId = resumeState.conversationId || reqConversationId;
try {
const partialMessage = {
messageId: resumeState.responseMessageId || `${resumeState.userMessage.messageId}_`,
conversationId: responseConversationId,
parentMessageId: resumeState.userMessage.messageId,
sender: client?.sender ?? 'AI',
content: aggregatedContent,
unfinished: true,
error: false,
isCreatedByUser: false,
user: userId,
endpoint: endpointOption.endpoint,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
};
if (req.body?.agent_id) {
partialMessage.agent_id = req.body.agent_id;
}
await saveMessage(req, partialMessage, {
context: 'api/server/controllers/agents/request.js - partial response on disconnect',
});
logger.debug(
`[ResumableAgentController] Saved partial response for ${streamId}, content parts: ${aggregatedContent.length}`,
);
} catch (error) {
logger.error('[ResumableAgentController] Error saving partial response:', error);
// Reset flag so we can try again if subscribers reconnect and leave again
partialResponseSaved = false;
}
});
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
res,
endpointOption,
signal: prelimAbortController.signal,
});
if (prelimAbortController.signal.aborted) {
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
return res.status(400).json({ error: 'Request aborted during initialization' });
}
client = result.client;
if (client?.sender) {
GenerationJobManager.updateMetadata(streamId, { sender: client.sender });
}
// Store reference to client's contentParts - graph will be set when run is created
if (client?.contentParts) {
GenerationJobManager.setContentParts(streamId, client.contentParts);
}
res.json({ streamId, status: 'started' });
let conversationId = reqConversationId;
let userMessage;
const getReqData = (data = {}) => {
if (data.userMessage) {
userMessage = data.userMessage;
}
if (!conversationId && data.conversationId) {
conversationId = data.conversationId;
}
};
// Start background generation - wait for subscriber with timeout fallback
const startGeneration = async () => {
try {
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 2500))]);
} catch (waitError) {
logger.warn(
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
);
}
try {
const onStart = (userMsg, respMsgId, _isNewConvo) => {
userMessage = userMsg;
// Store userMessage and responseMessageId upfront for resume capability
GenerationJobManager.updateMetadata(streamId, {
responseMessageId: respMsgId,
userMessage: {
messageId: userMsg.messageId,
parentMessageId: userMsg.parentMessageId,
conversationId: userMsg.conversationId,
text: userMsg.text,
},
});
GenerationJobManager.emitChunk(streamId, {
created: true,
message: userMessage,
streamId,
});
};
const messageOptions = {
user: userId,
onStart,
getReqData,
isContinued,
isRegenerate,
editedContent,
conversationId,
parentMessageId,
abortController: job.abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
responseMessageId: editedResponseMessageId,
progressOptions: {
res: {
write: () => true,
end: () => {},
headersSent: false,
writableEnded: false,
},
},
};
const response = await client.sendMessage(text, messageOptions);
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
const databasePromise = response.databasePromise;
delete response.databasePromise;
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (req.body.files && client.options?.attachments) {
userMessage.files = [];
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
for (const attachment of client.options.attachments) {
if (messageFiles.has(attachment.file_id)) {
userMessage.files.push(sanitizeFileForTransmit(attachment));
}
}
delete userMessage.image_urls;
}
if (!job.abortController.signal.aborted) {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response },
};
GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId);
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
req,
{ ...response, user: userId },
{ context: 'api/server/controllers/agents/request.js - resumable response end' },
);
}
} else {
const finalEvent = {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: { ...response, error: true },
error: { message: 'Request was aborted' },
};
GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId, 'Request aborted');
}
if (!client.skipSaveUserMessage && userMessage) {
await saveMessage(req, userMessage, {
context: 'api/server/controllers/agents/request.js - resumable user message',
});
}
// Skip title generation if job was aborted
const newConvo = !reqConversationId;
const shouldGenerateTitle =
addTitle &&
parentMessageId === Constants.NO_PARENT &&
newConvo &&
!job.abortController.signal.aborted;
if (shouldGenerateTitle) {
addTitle(req, {
text,
response: { ...response },
client,
})
.catch((err) => {
logger.error('[ResumableAgentController] Error in title generation', err);
})
.finally(() => {
if (client) {
disposeClient(client);
}
});
} else {
if (client) {
disposeClient(client);
}
}
} catch (error) {
// Check if this was an abort (not a real error)
const wasAborted = job.abortController.signal.aborted || error.message?.includes('abort');
if (wasAborted) {
logger.debug(`[ResumableAgentController] Generation aborted for ${streamId}`);
// abortJob already handled emitDone and completeJob
} else {
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
GenerationJobManager.completeJob(streamId, error.message);
}
if (client) {
disposeClient(client);
}
// Don't continue to title generation after error/abort
return;
}
};
// Start generation and handle any unhandled errors
startGeneration().catch((err) => {
logger.error(
`[ResumableAgentController] Unhandled error in background generation: ${err.message}`,
);
GenerationJobManager.completeJob(streamId, err.message);
});
} catch (error) {
logger.error('[ResumableAgentController] Initialization error:', error);
if (!res.headersSent) {
res.status(500).json({ error: error.message || 'Failed to start generation' });
}
GenerationJobManager.completeJob(streamId, error.message);
if (client) {
disposeClient(client);
}
}
};
const AgentController = async (req, res, next, initializeClient, addTitle) => {
const isResumable = req.query.resumable === 'true';
if (isResumable) {
return ResumableAgentController(req, res, next, initializeClient, addTitle);
}
let {
text,
isRegenerate,
endpointOption,
conversationId,
isContinued = false,
editedContent = null,
parentMessageId = null,
overrideParentMessageId = null,
responseMessageId: editedResponseMessageId = null,
} = req.body;
let sender;
let abortKey;
let userMessage;
let promptTokens;
let userMessageId;
let responseMessageId;
let userMessagePromise;
let getAbortData;
let client = null;
let cleanupHandlers = [];
const newConvo = !conversationId;
const userId = req.user.id;
// Create handler to avoid capturing the entire parent scope
let getReqData = (data = {}) => {
for (let key in data) {
if (key === 'userMessage') {
userMessage = data[key];
userMessageId = data[key].messageId;
} else if (key === 'userMessagePromise') {
userMessagePromise = data[key];
} else if (key === 'responseMessageId') {
responseMessageId = data[key];
} else if (key === 'promptTokens') {
promptTokens = data[key];
} else if (key === 'sender') {
sender = data[key];
} else if (key === 'abortKey') {
abortKey = data[key];
} else if (!conversationId && key === 'conversationId') {
conversationId = data[key];
}
}
};
// Create a function to handle final cleanup
const performCleanup = () => {
logger.debug('[AgentController] Performing cleanup');
if (Array.isArray(cleanupHandlers)) {
for (const handler of cleanupHandlers) {
try {
if (typeof handler === 'function') {
handler();
}
} catch (e) {
logger.error('[AgentController] Error in cleanup handler', e);
}
}
}
// Clean up abort controller
if (abortKey) {
logger.debug('[AgentController] Cleaning up abort controller');
cleanupAbortController(abortKey);
}
// Dispose client properly
if (client) {
disposeClient(client);
}
// Clear all references
client = null;
getReqData = null;
userMessage = null;
getAbortData = null;
endpointOption.agent = null;
endpointOption = null;
cleanupHandlers = null;
userMessagePromise = null;
// Clear request data map
if (requestDataMap.has(req)) {
requestDataMap.delete(req);
}
logger.debug('[AgentController] Cleanup completed');
};
try {
let prelimAbortController = new AbortController();
const prelimCloseHandler = createCloseHandler(prelimAbortController);
res.on('close', prelimCloseHandler);
const removePrelimHandler = (manual) => {
try {
prelimCloseHandler(manual);
res.removeListener('close', prelimCloseHandler);
} catch (e) {
logger.error('[AgentController] Error removing close listener', e);
}
};
cleanupHandlers.push(removePrelimHandler);
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
res,
endpointOption,
signal: prelimAbortController.signal,
});
if (prelimAbortController.signal?.aborted) {
prelimAbortController = null;
throw new Error('Request was aborted before initialization could complete');
} else {
prelimAbortController = null;
removePrelimHandler(true);
cleanupHandlers.pop();
}
client = result.client;
// Register client with finalization registry if available
if (clientRegistry) {
clientRegistry.register(client, { userId }, client);
}
// Store request data in WeakMap keyed by req object
requestDataMap.set(req, { client });
// Use WeakRef to allow GC but still access content if it exists
const contentRef = new WeakRef(client.contentParts || []);
// Minimize closure scope - only capture small primitives and WeakRef
getAbortData = () => {
// Dereference WeakRef each time
const content = contentRef.deref();
return {
sender,
content: content || [],
userMessage,
promptTokens,
conversationId,
userMessagePromise,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
};
};
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
const closeHandler = createCloseHandler(abortController);
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
res.removeListener('close', closeHandler);
} catch (e) {
logger.error('[AgentController] Error removing close listener', e);
}
});
const messageOptions = {
user: userId,
onStart,
getReqData,
isContinued,
isRegenerate,
editedContent,
conversationId,
parentMessageId,
abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
responseMessageId: editedResponseMessageId,
progressOptions: {
res,
},
};
let response = await client.sendMessage(text, messageOptions);
// Extract what we need and immediately break reference
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
// Store database promise locally
const databasePromise = response.databasePromise;
delete response.databasePromise;
// Resolve database-related data
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
// Process files if needed (sanitize to remove large text fields before transmission)
if (req.body.files && client.options?.attachments) {
userMessage.files = [];
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
for (const attachment of client.options.attachments) {
if (messageFiles.has(attachment.file_id)) {
userMessage.files.push(sanitizeFileForTransmit(attachment));
}
}
delete userMessage.image_urls;
}
// Only send if not aborted
if (!abortController.signal.aborted) {
// Create a new response object with minimal copies
const finalResponse = { ...response };
sendEvent(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: finalResponse,
});
res.end();
// Save the message if needed
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
req,
{ ...finalResponse, user: userId },
{ context: 'api/server/controllers/agents/request.js - response end' },
);
}
}
// Edge case: sendMessage completed but abort happened during sendCompletion
// We need to ensure a final event is sent
else if (!res.headersSent && !res.finished) {
logger.debug(
'[AgentController] Handling edge case: `sendMessage` completed but aborted during `sendCompletion`',
);
const finalResponse = { ...response };
finalResponse.error = true;
sendEvent(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: finalResponse,
error: { message: 'Request was aborted during completion' },
});
res.end();
}
// Save user message if needed
if (!client.skipSaveUserMessage) {
await saveMessage(req, userMessage, {
context: "api/server/controllers/agents/request.js - don't skip saving user message",
});
}
// Add title if needed - extract minimal data
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) {
addTitle(req, {
text,
response: { ...response },
client,
})
.then(() => {
logger.debug('[AgentController] Title generation started');
})
.catch((err) => {
logger.error('[AgentController] Error in title generation', err);
})
.finally(() => {
logger.debug('[AgentController] Title generation completed');
performCleanup();
});
} else {
performCleanup();
}
} catch (error) {
// Handle error without capturing much scope
handleAbortError(res, req, error, {
conversationId,
sender,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
userMessageId,
})
.catch((err) => {
logger.error('[api/server/controllers/agents/request] Error in `handleAbortError`', err);
})
.finally(() => {
performCleanup();
});
}
};
module.exports = AgentController;