mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-17 08:50:15 +01:00
- Reduced the timeout duration from 5000ms to 2500ms in the startGeneration function to improve responsiveness when waiting for subscriber readiness. This change aims to enhance the efficiency of the agent's background generation process.
656 lines
21 KiB
JavaScript
656 lines
21 KiB
JavaScript
const { logger } = require('@librechat/data-schemas');
|
|
const { Constants } = require('librechat-data-provider');
|
|
const {
|
|
sendEvent,
|
|
GenerationJobManager,
|
|
sanitizeFileForTransmit,
|
|
sanitizeMessageForTransmit,
|
|
} = require('@librechat/api');
|
|
const {
|
|
handleAbortError,
|
|
createAbortController,
|
|
cleanupAbortController,
|
|
} = require('~/server/middleware');
|
|
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
|
|
const { saveMessage } = require('~/models');
|
|
|
|
function createCloseHandler(abortController) {
|
|
return function (manual) {
|
|
if (!manual) {
|
|
logger.debug('[AgentController] Request closed');
|
|
}
|
|
if (!abortController) {
|
|
return;
|
|
} else if (abortController.signal.aborted) {
|
|
return;
|
|
} else if (abortController.requestCompleted) {
|
|
return;
|
|
}
|
|
|
|
abortController.abort();
|
|
logger.debug('[AgentController] Request aborted on close');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Resumable Agent Controller - Generation runs independently of HTTP connection.
|
|
* Returns streamId immediately, client subscribes separately via SSE.
|
|
*/
|
|
const ResumableAgentController = async (req, res, next, initializeClient, addTitle) => {
|
|
const {
|
|
text,
|
|
isRegenerate,
|
|
endpointOption,
|
|
conversationId: reqConversationId,
|
|
isContinued = false,
|
|
editedContent = null,
|
|
parentMessageId = null,
|
|
overrideParentMessageId = null,
|
|
responseMessageId: editedResponseMessageId = null,
|
|
} = req.body;
|
|
|
|
const userId = req.user.id;
|
|
const streamId =
|
|
reqConversationId || `stream_${Date.now()}_${Math.random().toString(36).slice(2)}`;
|
|
|
|
let client = null;
|
|
|
|
try {
|
|
const prelimAbortController = new AbortController();
|
|
res.on('close', () => {
|
|
if (!prelimAbortController.signal.aborted) {
|
|
prelimAbortController.abort();
|
|
}
|
|
});
|
|
|
|
const job = GenerationJobManager.createJob(streamId, userId, reqConversationId);
|
|
req._resumableStreamId = streamId;
|
|
|
|
// Track if partial response was already saved to avoid duplicates
|
|
let partialResponseSaved = false;
|
|
|
|
/**
|
|
* Listen for all subscribers leaving to save partial response.
|
|
* This ensures the response is saved to DB even if all clients disconnect
|
|
* while generation continues.
|
|
*
|
|
* Note: The messageId used here falls back to `${userMessage.messageId}_` if the
|
|
* actual response messageId isn't available yet. The final response save will
|
|
* overwrite this with the complete response using the same messageId pattern.
|
|
*/
|
|
job.emitter.on('allSubscribersLeft', async (aggregatedContent) => {
|
|
if (partialResponseSaved || !aggregatedContent || aggregatedContent.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const resumeState = GenerationJobManager.getResumeState(streamId);
|
|
if (!resumeState?.userMessage) {
|
|
logger.debug('[ResumableAgentController] No user message to save partial response for');
|
|
return;
|
|
}
|
|
|
|
partialResponseSaved = true;
|
|
const responseConversationId = resumeState.conversationId || reqConversationId;
|
|
|
|
try {
|
|
const partialMessage = {
|
|
messageId: resumeState.responseMessageId || `${resumeState.userMessage.messageId}_`,
|
|
conversationId: responseConversationId,
|
|
parentMessageId: resumeState.userMessage.messageId,
|
|
sender: client?.sender ?? 'AI',
|
|
content: aggregatedContent,
|
|
unfinished: true,
|
|
error: false,
|
|
isCreatedByUser: false,
|
|
user: userId,
|
|
endpoint: endpointOption.endpoint,
|
|
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
|
|
};
|
|
|
|
if (req.body?.agent_id) {
|
|
partialMessage.agent_id = req.body.agent_id;
|
|
}
|
|
|
|
await saveMessage(req, partialMessage, {
|
|
context: 'api/server/controllers/agents/request.js - partial response on disconnect',
|
|
});
|
|
|
|
logger.debug(
|
|
`[ResumableAgentController] Saved partial response for ${streamId}, content parts: ${aggregatedContent.length}`,
|
|
);
|
|
} catch (error) {
|
|
logger.error('[ResumableAgentController] Error saving partial response:', error);
|
|
// Reset flag so we can try again if subscribers reconnect and leave again
|
|
partialResponseSaved = false;
|
|
}
|
|
});
|
|
|
|
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
|
|
const result = await initializeClient({
|
|
req,
|
|
res,
|
|
endpointOption,
|
|
signal: prelimAbortController.signal,
|
|
});
|
|
|
|
if (prelimAbortController.signal.aborted) {
|
|
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
|
|
return res.status(400).json({ error: 'Request aborted during initialization' });
|
|
}
|
|
|
|
client = result.client;
|
|
|
|
if (client?.sender) {
|
|
GenerationJobManager.updateMetadata(streamId, { sender: client.sender });
|
|
}
|
|
|
|
// Store reference to client's contentParts - graph will be set when run is created
|
|
if (client?.contentParts) {
|
|
GenerationJobManager.setContentParts(streamId, client.contentParts);
|
|
}
|
|
|
|
res.json({ streamId, status: 'started' });
|
|
|
|
let conversationId = reqConversationId;
|
|
let userMessage;
|
|
|
|
const getReqData = (data = {}) => {
|
|
if (data.userMessage) {
|
|
userMessage = data.userMessage;
|
|
}
|
|
if (!conversationId && data.conversationId) {
|
|
conversationId = data.conversationId;
|
|
}
|
|
};
|
|
|
|
// Start background generation - wait for subscriber with timeout fallback
|
|
const startGeneration = async () => {
|
|
try {
|
|
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 2500))]);
|
|
} catch (waitError) {
|
|
logger.warn(
|
|
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
|
|
);
|
|
}
|
|
|
|
try {
|
|
const onStart = (userMsg, respMsgId, _isNewConvo) => {
|
|
userMessage = userMsg;
|
|
|
|
// Store userMessage and responseMessageId upfront for resume capability
|
|
GenerationJobManager.updateMetadata(streamId, {
|
|
responseMessageId: respMsgId,
|
|
userMessage: {
|
|
messageId: userMsg.messageId,
|
|
parentMessageId: userMsg.parentMessageId,
|
|
conversationId: userMsg.conversationId,
|
|
text: userMsg.text,
|
|
},
|
|
});
|
|
|
|
GenerationJobManager.emitChunk(streamId, {
|
|
created: true,
|
|
message: userMessage,
|
|
streamId,
|
|
});
|
|
};
|
|
|
|
const messageOptions = {
|
|
user: userId,
|
|
onStart,
|
|
getReqData,
|
|
isContinued,
|
|
isRegenerate,
|
|
editedContent,
|
|
conversationId,
|
|
parentMessageId,
|
|
abortController: job.abortController,
|
|
overrideParentMessageId,
|
|
isEdited: !!editedContent,
|
|
userMCPAuthMap: result.userMCPAuthMap,
|
|
responseMessageId: editedResponseMessageId,
|
|
progressOptions: {
|
|
res: {
|
|
write: () => true,
|
|
end: () => {},
|
|
headersSent: false,
|
|
writableEnded: false,
|
|
},
|
|
},
|
|
};
|
|
|
|
const response = await client.sendMessage(text, messageOptions);
|
|
|
|
const messageId = response.messageId;
|
|
const endpoint = endpointOption.endpoint;
|
|
response.endpoint = endpoint;
|
|
|
|
const databasePromise = response.databasePromise;
|
|
delete response.databasePromise;
|
|
|
|
const { conversation: convoData = {} } = await databasePromise;
|
|
const conversation = { ...convoData };
|
|
conversation.title =
|
|
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
|
|
|
|
if (req.body.files && client.options?.attachments) {
|
|
userMessage.files = [];
|
|
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
|
|
for (const attachment of client.options.attachments) {
|
|
if (messageFiles.has(attachment.file_id)) {
|
|
userMessage.files.push(sanitizeFileForTransmit(attachment));
|
|
}
|
|
}
|
|
delete userMessage.image_urls;
|
|
}
|
|
|
|
if (!job.abortController.signal.aborted) {
|
|
const finalEvent = {
|
|
final: true,
|
|
conversation,
|
|
title: conversation.title,
|
|
requestMessage: sanitizeMessageForTransmit(userMessage),
|
|
responseMessage: { ...response },
|
|
};
|
|
|
|
GenerationJobManager.emitDone(streamId, finalEvent);
|
|
GenerationJobManager.completeJob(streamId);
|
|
|
|
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
|
|
await saveMessage(
|
|
req,
|
|
{ ...response, user: userId },
|
|
{ context: 'api/server/controllers/agents/request.js - resumable response end' },
|
|
);
|
|
}
|
|
} else {
|
|
const finalEvent = {
|
|
final: true,
|
|
conversation,
|
|
title: conversation.title,
|
|
requestMessage: sanitizeMessageForTransmit(userMessage),
|
|
responseMessage: { ...response, error: true },
|
|
error: { message: 'Request was aborted' },
|
|
};
|
|
GenerationJobManager.emitDone(streamId, finalEvent);
|
|
GenerationJobManager.completeJob(streamId, 'Request aborted');
|
|
}
|
|
|
|
if (!client.skipSaveUserMessage && userMessage) {
|
|
await saveMessage(req, userMessage, {
|
|
context: 'api/server/controllers/agents/request.js - resumable user message',
|
|
});
|
|
}
|
|
|
|
// Skip title generation if job was aborted
|
|
const newConvo = !reqConversationId;
|
|
const shouldGenerateTitle =
|
|
addTitle &&
|
|
parentMessageId === Constants.NO_PARENT &&
|
|
newConvo &&
|
|
!job.abortController.signal.aborted;
|
|
|
|
if (shouldGenerateTitle) {
|
|
addTitle(req, {
|
|
text,
|
|
response: { ...response },
|
|
client,
|
|
})
|
|
.catch((err) => {
|
|
logger.error('[ResumableAgentController] Error in title generation', err);
|
|
})
|
|
.finally(() => {
|
|
if (client) {
|
|
disposeClient(client);
|
|
}
|
|
});
|
|
} else {
|
|
if (client) {
|
|
disposeClient(client);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
// Check if this was an abort (not a real error)
|
|
const wasAborted = job.abortController.signal.aborted || error.message?.includes('abort');
|
|
|
|
if (wasAborted) {
|
|
logger.debug(`[ResumableAgentController] Generation aborted for ${streamId}`);
|
|
// abortJob already handled emitDone and completeJob
|
|
} else {
|
|
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
|
|
GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
|
|
GenerationJobManager.completeJob(streamId, error.message);
|
|
}
|
|
|
|
if (client) {
|
|
disposeClient(client);
|
|
}
|
|
|
|
// Don't continue to title generation after error/abort
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Start generation and handle any unhandled errors
|
|
startGeneration().catch((err) => {
|
|
logger.error(
|
|
`[ResumableAgentController] Unhandled error in background generation: ${err.message}`,
|
|
);
|
|
GenerationJobManager.completeJob(streamId, err.message);
|
|
});
|
|
} catch (error) {
|
|
logger.error('[ResumableAgentController] Initialization error:', error);
|
|
if (!res.headersSent) {
|
|
res.status(500).json({ error: error.message || 'Failed to start generation' });
|
|
}
|
|
GenerationJobManager.completeJob(streamId, error.message);
|
|
if (client) {
|
|
disposeClient(client);
|
|
}
|
|
}
|
|
};
|
|
|
|
const AgentController = async (req, res, next, initializeClient, addTitle) => {
|
|
const isResumable = req.query.resumable === 'true';
|
|
if (isResumable) {
|
|
return ResumableAgentController(req, res, next, initializeClient, addTitle);
|
|
}
|
|
|
|
let {
|
|
text,
|
|
isRegenerate,
|
|
endpointOption,
|
|
conversationId,
|
|
isContinued = false,
|
|
editedContent = null,
|
|
parentMessageId = null,
|
|
overrideParentMessageId = null,
|
|
responseMessageId: editedResponseMessageId = null,
|
|
} = req.body;
|
|
|
|
let sender;
|
|
let abortKey;
|
|
let userMessage;
|
|
let promptTokens;
|
|
let userMessageId;
|
|
let responseMessageId;
|
|
let userMessagePromise;
|
|
let getAbortData;
|
|
let client = null;
|
|
let cleanupHandlers = [];
|
|
|
|
const newConvo = !conversationId;
|
|
const userId = req.user.id;
|
|
|
|
// Create handler to avoid capturing the entire parent scope
|
|
let getReqData = (data = {}) => {
|
|
for (let key in data) {
|
|
if (key === 'userMessage') {
|
|
userMessage = data[key];
|
|
userMessageId = data[key].messageId;
|
|
} else if (key === 'userMessagePromise') {
|
|
userMessagePromise = data[key];
|
|
} else if (key === 'responseMessageId') {
|
|
responseMessageId = data[key];
|
|
} else if (key === 'promptTokens') {
|
|
promptTokens = data[key];
|
|
} else if (key === 'sender') {
|
|
sender = data[key];
|
|
} else if (key === 'abortKey') {
|
|
abortKey = data[key];
|
|
} else if (!conversationId && key === 'conversationId') {
|
|
conversationId = data[key];
|
|
}
|
|
}
|
|
};
|
|
|
|
// Create a function to handle final cleanup
|
|
const performCleanup = () => {
|
|
logger.debug('[AgentController] Performing cleanup');
|
|
if (Array.isArray(cleanupHandlers)) {
|
|
for (const handler of cleanupHandlers) {
|
|
try {
|
|
if (typeof handler === 'function') {
|
|
handler();
|
|
}
|
|
} catch (e) {
|
|
logger.error('[AgentController] Error in cleanup handler', e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up abort controller
|
|
if (abortKey) {
|
|
logger.debug('[AgentController] Cleaning up abort controller');
|
|
cleanupAbortController(abortKey);
|
|
}
|
|
|
|
// Dispose client properly
|
|
if (client) {
|
|
disposeClient(client);
|
|
}
|
|
|
|
// Clear all references
|
|
client = null;
|
|
getReqData = null;
|
|
userMessage = null;
|
|
getAbortData = null;
|
|
endpointOption.agent = null;
|
|
endpointOption = null;
|
|
cleanupHandlers = null;
|
|
userMessagePromise = null;
|
|
|
|
// Clear request data map
|
|
if (requestDataMap.has(req)) {
|
|
requestDataMap.delete(req);
|
|
}
|
|
logger.debug('[AgentController] Cleanup completed');
|
|
};
|
|
|
|
try {
|
|
let prelimAbortController = new AbortController();
|
|
const prelimCloseHandler = createCloseHandler(prelimAbortController);
|
|
res.on('close', prelimCloseHandler);
|
|
const removePrelimHandler = (manual) => {
|
|
try {
|
|
prelimCloseHandler(manual);
|
|
res.removeListener('close', prelimCloseHandler);
|
|
} catch (e) {
|
|
logger.error('[AgentController] Error removing close listener', e);
|
|
}
|
|
};
|
|
cleanupHandlers.push(removePrelimHandler);
|
|
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
|
|
const result = await initializeClient({
|
|
req,
|
|
res,
|
|
endpointOption,
|
|
signal: prelimAbortController.signal,
|
|
});
|
|
if (prelimAbortController.signal?.aborted) {
|
|
prelimAbortController = null;
|
|
throw new Error('Request was aborted before initialization could complete');
|
|
} else {
|
|
prelimAbortController = null;
|
|
removePrelimHandler(true);
|
|
cleanupHandlers.pop();
|
|
}
|
|
client = result.client;
|
|
|
|
// Register client with finalization registry if available
|
|
if (clientRegistry) {
|
|
clientRegistry.register(client, { userId }, client);
|
|
}
|
|
|
|
// Store request data in WeakMap keyed by req object
|
|
requestDataMap.set(req, { client });
|
|
|
|
// Use WeakRef to allow GC but still access content if it exists
|
|
const contentRef = new WeakRef(client.contentParts || []);
|
|
|
|
// Minimize closure scope - only capture small primitives and WeakRef
|
|
getAbortData = () => {
|
|
// Dereference WeakRef each time
|
|
const content = contentRef.deref();
|
|
|
|
return {
|
|
sender,
|
|
content: content || [],
|
|
userMessage,
|
|
promptTokens,
|
|
conversationId,
|
|
userMessagePromise,
|
|
messageId: responseMessageId,
|
|
parentMessageId: overrideParentMessageId ?? userMessageId,
|
|
};
|
|
};
|
|
|
|
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
|
|
const closeHandler = createCloseHandler(abortController);
|
|
res.on('close', closeHandler);
|
|
cleanupHandlers.push(() => {
|
|
try {
|
|
res.removeListener('close', closeHandler);
|
|
} catch (e) {
|
|
logger.error('[AgentController] Error removing close listener', e);
|
|
}
|
|
});
|
|
|
|
const messageOptions = {
|
|
user: userId,
|
|
onStart,
|
|
getReqData,
|
|
isContinued,
|
|
isRegenerate,
|
|
editedContent,
|
|
conversationId,
|
|
parentMessageId,
|
|
abortController,
|
|
overrideParentMessageId,
|
|
isEdited: !!editedContent,
|
|
userMCPAuthMap: result.userMCPAuthMap,
|
|
responseMessageId: editedResponseMessageId,
|
|
progressOptions: {
|
|
res,
|
|
},
|
|
};
|
|
|
|
let response = await client.sendMessage(text, messageOptions);
|
|
|
|
// Extract what we need and immediately break reference
|
|
const messageId = response.messageId;
|
|
const endpoint = endpointOption.endpoint;
|
|
response.endpoint = endpoint;
|
|
|
|
// Store database promise locally
|
|
const databasePromise = response.databasePromise;
|
|
delete response.databasePromise;
|
|
|
|
// Resolve database-related data
|
|
const { conversation: convoData = {} } = await databasePromise;
|
|
const conversation = { ...convoData };
|
|
conversation.title =
|
|
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
|
|
|
|
// Process files if needed (sanitize to remove large text fields before transmission)
|
|
if (req.body.files && client.options?.attachments) {
|
|
userMessage.files = [];
|
|
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
|
|
for (const attachment of client.options.attachments) {
|
|
if (messageFiles.has(attachment.file_id)) {
|
|
userMessage.files.push(sanitizeFileForTransmit(attachment));
|
|
}
|
|
}
|
|
delete userMessage.image_urls;
|
|
}
|
|
|
|
// Only send if not aborted
|
|
if (!abortController.signal.aborted) {
|
|
// Create a new response object with minimal copies
|
|
const finalResponse = { ...response };
|
|
|
|
sendEvent(res, {
|
|
final: true,
|
|
conversation,
|
|
title: conversation.title,
|
|
requestMessage: sanitizeMessageForTransmit(userMessage),
|
|
responseMessage: finalResponse,
|
|
});
|
|
res.end();
|
|
|
|
// Save the message if needed
|
|
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
|
|
await saveMessage(
|
|
req,
|
|
{ ...finalResponse, user: userId },
|
|
{ context: 'api/server/controllers/agents/request.js - response end' },
|
|
);
|
|
}
|
|
}
|
|
// Edge case: sendMessage completed but abort happened during sendCompletion
|
|
// We need to ensure a final event is sent
|
|
else if (!res.headersSent && !res.finished) {
|
|
logger.debug(
|
|
'[AgentController] Handling edge case: `sendMessage` completed but aborted during `sendCompletion`',
|
|
);
|
|
|
|
const finalResponse = { ...response };
|
|
finalResponse.error = true;
|
|
|
|
sendEvent(res, {
|
|
final: true,
|
|
conversation,
|
|
title: conversation.title,
|
|
requestMessage: sanitizeMessageForTransmit(userMessage),
|
|
responseMessage: finalResponse,
|
|
error: { message: 'Request was aborted during completion' },
|
|
});
|
|
res.end();
|
|
}
|
|
|
|
// Save user message if needed
|
|
if (!client.skipSaveUserMessage) {
|
|
await saveMessage(req, userMessage, {
|
|
context: "api/server/controllers/agents/request.js - don't skip saving user message",
|
|
});
|
|
}
|
|
|
|
// Add title if needed - extract minimal data
|
|
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) {
|
|
addTitle(req, {
|
|
text,
|
|
response: { ...response },
|
|
client,
|
|
})
|
|
.then(() => {
|
|
logger.debug('[AgentController] Title generation started');
|
|
})
|
|
.catch((err) => {
|
|
logger.error('[AgentController] Error in title generation', err);
|
|
})
|
|
.finally(() => {
|
|
logger.debug('[AgentController] Title generation completed');
|
|
performCleanup();
|
|
});
|
|
} else {
|
|
performCleanup();
|
|
}
|
|
} catch (error) {
|
|
// Handle error without capturing much scope
|
|
handleAbortError(res, req, error, {
|
|
conversationId,
|
|
sender,
|
|
messageId: responseMessageId,
|
|
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
|
|
userMessageId,
|
|
})
|
|
.catch((err) => {
|
|
logger.error('[api/server/controllers/agents/request] Error in `handleAbortError`', err);
|
|
})
|
|
.finally(() => {
|
|
performCleanup();
|
|
});
|
|
}
|
|
};
|
|
|
|
module.exports = AgentController;
|