🤖 refactor: Improve Agents Memory Usage, Bump Keyv, Grok 3 (#6850)

* chore: remove unused redis file

* chore: bump keyv dependencies, and update related imports

* refactor: Implement IoRedis client for rate limiting across middleware, as node-redis via keyv not compatible

* fix: Set max listeners to expected amount

* WIP: memory improvements

* refactor: Simplify getAbortData assignment in createAbortController

* refactor: Update getAbortData to use WeakRef for content management

* WIP: memory improvements in agent chat requests

* refactor: Enhance memory management with finalization registry and cleanup functions

* refactor: Simplify domainParser calls by removing unnecessary request parameter

* refactor: Update parameter types for action tools and agent loading functions to use minimal configs

* refactor: Simplify domainParser tests by removing unnecessary request parameter

* refactor: Simplify domainParser call by removing unnecessary request parameter

* refactor: Enhance client disposal by nullifying additional properties to improve memory management

* refactor: Improve title generation by adding abort controller and timeout handling, consolidate request cleanup

* refactor: Update checkIdleConnections to skip current user when checking for idle connections if passed

* refactor: Update createMCPTool to derive userId from config and handle abort signals

* refactor: Introduce createTokenCounter function and update tokenCounter usage; enhance disposeClient to reset Graph values

* refactor: Update getMCPManager to accept userId parameter for improved idle connection handling

* refactor: Extract logToolError function for improved error handling in AgentClient

* refactor: Update disposeClient to clear handlerRegistry and graphRunnable references in client.run

* refactor: Extract createHandleNewToken function to streamline token handling in initializeClient

* chore: bump @librechat/agents

* refactor: Improve timeout handling in addTitle function for better error management

* refactor: Introduce createFetch instead of using class method

* refactor: Enhance client disposal and request data handling in AskController and EditController

* refactor: Update import statements for AnthropicClient and OpenAIClient to use specific paths

* refactor: Use WeakRef for response handling in SplitStreamHandler to prevent memory leaks

* refactor: Simplify client disposal and rename getReqData to processReqData in AskController and EditController

* refactor: Improve logging structure and parameter handling in OpenAIClient

* refactor: Remove unused GraphEvents and improve stream event handling in AnthropicClient and OpenAIClient

* refactor: Simplify client initialization in AskController and EditController

* refactor: Remove unused mock functions and implement in-memory store for KeyvMongo

* chore: Update dependencies in package-lock.json to latest versions

* refactor: Await token usage recording in OpenAIClient to ensure proper async handling

* refactor: Remove handleAbort route from multiple endpoints and enhance client disposal logic

* refactor: Enhance abort controller logic by managing abortKey more effectively

* refactor: Add newConversation handling in useEventHandlers for improved conversation management

* fix: dropparams

* refactor: Use optional chaining for safer access to request properties in BaseClient

* refactor: Move client disposal and request data processing logic to cleanup module for better organization

* refactor: Remove aborted request check from addTitle function for cleaner logic

* feat: Add Grok 3 model pricing and update tests for new models

* chore: Remove trace warnings and inspect flags from backend start script used for debugging

* refactor: Replace user identifier handling with userId for consistency across controllers, use UserId in clientRegistry

* refactor: Enhance client disposal logic to prevent memory leaks by clearing additional references

* chore: Update @librechat/agents to version 2.4.14 in package.json and package-lock.json
This commit is contained in:
Danny Avila 2025-04-12 18:46:36 -04:00 committed by GitHub
parent 1e6b1b9554
commit 37964975c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
68 changed files with 1796 additions and 623 deletions

View file

@ -1,5 +1,15 @@
const { getResponseSender, Constants } = require('librechat-data-provider');
const { createAbortController, handleAbortError } = require('~/server/middleware');
const {
handleAbortError,
createAbortController,
cleanupAbortController,
} = require('~/server/middleware');
const {
disposeClient,
processReqData,
clientRegistry,
requestDataMap,
} = require('~/server/cleanup');
const { sendMessage, createOnProgress } = require('~/server/utils');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');
@ -14,90 +24,162 @@ const AskController = async (req, res, next, initializeClient, addTitle) => {
overrideParentMessageId = null,
} = req.body;
let client = null;
let abortKey = null;
let cleanupHandlers = [];
let clientRef = null;
logger.debug('[AskController]', {
text,
conversationId,
...endpointOption,
modelsConfig: endpointOption.modelsConfig ? 'exists' : '',
modelsConfig: endpointOption?.modelsConfig ? 'exists' : '',
});
let userMessage;
let userMessagePromise;
let promptTokens;
let userMessageId;
let responseMessageId;
let userMessage = null;
let userMessagePromise = null;
let promptTokens = null;
let userMessageId = null;
let responseMessageId = null;
let getAbortData = null;
const sender = getResponseSender({
...endpointOption,
model: endpointOption.modelOptions.model,
modelDisplayLabel,
});
const newConvo = !conversationId;
const user = req.user.id;
const initialConversationId = conversationId;
const newConvo = !initialConversationId;
const userId = req.user.id;
const getReqData = (data = {}) => {
for (let key in data) {
if (key === 'userMessage') {
userMessage = data[key];
userMessageId = data[key].messageId;
} else if (key === 'userMessagePromise') {
userMessagePromise = data[key];
} else if (key === 'responseMessageId') {
responseMessageId = data[key];
} else if (key === 'promptTokens') {
promptTokens = data[key];
} else if (!conversationId && key === 'conversationId') {
conversationId = data[key];
}
}
let reqDataContext = {
userMessage,
userMessagePromise,
responseMessageId,
promptTokens,
conversationId,
userMessageId,
};
let getText;
const updateReqData = (data = {}) => {
reqDataContext = processReqData(data, reqDataContext);
abortKey = reqDataContext.abortKey;
userMessage = reqDataContext.userMessage;
userMessagePromise = reqDataContext.userMessagePromise;
responseMessageId = reqDataContext.responseMessageId;
promptTokens = reqDataContext.promptTokens;
conversationId = reqDataContext.conversationId;
userMessageId = reqDataContext.userMessageId;
};
let { onProgress: progressCallback, getPartialText } = createOnProgress();
const performCleanup = () => {
logger.debug('[AskController] Performing cleanup');
if (Array.isArray(cleanupHandlers)) {
for (const handler of cleanupHandlers) {
try {
if (typeof handler === 'function') {
handler();
}
} catch (e) {
// Ignore
}
}
}
if (abortKey) {
logger.debug('[AskController] Cleaning up abort controller');
cleanupAbortController(abortKey);
abortKey = null;
}
if (client) {
disposeClient(client);
client = null;
}
reqDataContext = null;
userMessage = null;
userMessagePromise = null;
promptTokens = null;
getAbortData = null;
progressCallback = null;
endpointOption = null;
cleanupHandlers = null;
addTitle = null;
if (requestDataMap.has(req)) {
requestDataMap.delete(req);
}
logger.debug('[AskController] Cleanup completed');
};
try {
const { client } = await initializeClient({ req, res, endpointOption });
const { onProgress: progressCallback, getPartialText } = createOnProgress();
({ client } = await initializeClient({ req, res, endpointOption }));
if (clientRegistry && client) {
clientRegistry.register(client, { userId }, client);
}
getText = client.getStreamText != null ? client.getStreamText.bind(client) : getPartialText;
if (client) {
requestDataMap.set(req, { client });
}
const getAbortData = () => ({
sender,
conversationId,
userMessagePromise,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: getText(),
userMessage,
promptTokens,
});
clientRef = new WeakRef(client);
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
getAbortData = () => {
const currentClient = clientRef.deref();
const currentText =
currentClient?.getStreamText != null ? currentClient.getStreamText() : getPartialText();
res.on('close', () => {
return {
sender,
conversationId,
messageId: reqDataContext.responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: currentText,
userMessage: userMessage,
userMessagePromise: userMessagePromise,
promptTokens: reqDataContext.promptTokens,
};
};
const { onStart, abortController } = createAbortController(
req,
res,
getAbortData,
updateReqData,
);
const closeHandler = () => {
logger.debug('[AskController] Request closed');
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
if (!abortController || abortController.signal.aborted || abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[AskController] Request aborted on close');
};
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
res.removeListener('close', closeHandler);
} catch (e) {
// Ignore
}
});
const messageOptions = {
user,
user: userId,
parentMessageId,
conversationId,
conversationId: reqDataContext.conversationId,
overrideParentMessageId,
getReqData,
getReqData: updateReqData,
onStart,
abortController,
progressCallback,
progressOptions: {
res,
// parentMessageId: overrideParentMessageId || userMessageId,
},
};
@ -105,59 +187,94 @@ const AskController = async (req, res, next, initializeClient, addTitle) => {
let response = await client.sendMessage(text, messageOptions);
response.endpoint = endpointOption.endpoint;
const { conversation = {} } = await client.responsePromise;
const databasePromise = response.databasePromise;
delete response.databasePromise;
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (client.options.attachments) {
userMessage.files = client.options.attachments;
conversation.model = endpointOption.modelOptions.model;
delete userMessage.image_urls;
const latestUserMessage = reqDataContext.userMessage;
if (client?.options?.attachments && latestUserMessage) {
latestUserMessage.files = client.options.attachments;
if (endpointOption?.modelOptions?.model) {
conversation.model = endpointOption.modelOptions.model;
}
delete latestUserMessage.image_urls;
}
if (!abortController.signal.aborted) {
const finalResponseMessage = { ...response };
sendMessage(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: userMessage,
responseMessage: response,
requestMessage: latestUserMessage,
responseMessage: finalResponseMessage,
});
res.end();
if (!client.savedMessageIds.has(response.messageId)) {
if (client?.savedMessageIds && !client.savedMessageIds.has(response.messageId)) {
await saveMessage(
req,
{ ...response, user },
{ ...finalResponseMessage, user: userId },
{ context: 'api/server/controllers/AskController.js - response end' },
);
}
}
if (!client.skipSaveUserMessage) {
await saveMessage(req, userMessage, {
if (!client?.skipSaveUserMessage && latestUserMessage) {
await saveMessage(req, latestUserMessage, {
context: 'api/server/controllers/AskController.js - don\'t skip saving user message',
});
}
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) {
if (typeof addTitle === 'function' && parentMessageId === Constants.NO_PARENT && newConvo) {
addTitle(req, {
text,
response,
response: { ...response },
client,
});
})
.then(() => {
logger.debug('[AskController] Title generation started');
})
.catch((err) => {
logger.error('[AskController] Error in title generation', err);
})
.finally(() => {
logger.debug('[AskController] Title generation completed');
performCleanup();
});
} else {
performCleanup();
}
} catch (error) {
const partialText = getText && getText();
logger.error('[AskController] Error handling request', error);
let partialText = '';
try {
const currentClient = clientRef.deref();
partialText =
currentClient?.getStreamText != null ? currentClient.getStreamText() : getPartialText();
} catch (getTextError) {
logger.error('[AskController] Error calling getText() during error handling', getTextError);
}
handleAbortError(res, req, error, {
sender,
partialText,
conversationId,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
}).catch((err) => {
logger.error('[AskController] Error in `handleAbortError`', err);
});
conversationId: reqDataContext.conversationId,
messageId: reqDataContext.responseMessageId,
parentMessageId: overrideParentMessageId ?? reqDataContext.userMessageId ?? parentMessageId,
})
.catch((err) => {
logger.error('[AskController] Error in `handleAbortError` during catch block', err);
})
.finally(() => {
performCleanup();
});
}
};

View file

@ -1,5 +1,15 @@
const { getResponseSender } = require('librechat-data-provider');
const { createAbortController, handleAbortError } = require('~/server/middleware');
const {
handleAbortError,
createAbortController,
cleanupAbortController,
} = require('~/server/middleware');
const {
disposeClient,
processReqData,
clientRegistry,
requestDataMap,
} = require('~/server/cleanup');
const { sendMessage, createOnProgress } = require('~/server/utils');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');
@ -17,6 +27,11 @@ const EditController = async (req, res, next, initializeClient) => {
overrideParentMessageId = null,
} = req.body;
let client = null;
let abortKey = null;
let cleanupHandlers = [];
let clientRef = null; // Declare clientRef here
logger.debug('[EditController]', {
text,
generation,
@ -26,123 +41,204 @@ const EditController = async (req, res, next, initializeClient) => {
modelsConfig: endpointOption.modelsConfig ? 'exists' : '',
});
let userMessage;
let userMessagePromise;
let promptTokens;
let userMessage = null;
let userMessagePromise = null;
let promptTokens = null;
let getAbortData = null;
const sender = getResponseSender({
...endpointOption,
model: endpointOption.modelOptions.model,
modelDisplayLabel,
});
const userMessageId = parentMessageId;
const user = req.user.id;
const userId = req.user.id;
const getReqData = (data = {}) => {
for (let key in data) {
if (key === 'userMessage') {
userMessage = data[key];
} else if (key === 'userMessagePromise') {
userMessagePromise = data[key];
} else if (key === 'responseMessageId') {
responseMessageId = data[key];
} else if (key === 'promptTokens') {
promptTokens = data[key];
}
}
let reqDataContext = { userMessage, userMessagePromise, responseMessageId, promptTokens };
const updateReqData = (data = {}) => {
reqDataContext = processReqData(data, reqDataContext);
abortKey = reqDataContext.abortKey;
userMessage = reqDataContext.userMessage;
userMessagePromise = reqDataContext.userMessagePromise;
responseMessageId = reqDataContext.responseMessageId;
promptTokens = reqDataContext.promptTokens;
};
const { onProgress: progressCallback, getPartialText } = createOnProgress({
let { onProgress: progressCallback, getPartialText } = createOnProgress({
generation,
});
let getText;
const performCleanup = () => {
logger.debug('[EditController] Performing cleanup');
if (Array.isArray(cleanupHandlers)) {
for (const handler of cleanupHandlers) {
try {
if (typeof handler === 'function') {
handler();
}
} catch (e) {
// Ignore
}
}
}
if (abortKey) {
logger.debug('[AskController] Cleaning up abort controller');
cleanupAbortController(abortKey);
abortKey = null;
}
if (client) {
disposeClient(client);
client = null;
}
reqDataContext = null;
userMessage = null;
userMessagePromise = null;
promptTokens = null;
getAbortData = null;
progressCallback = null;
endpointOption = null;
cleanupHandlers = null;
if (requestDataMap.has(req)) {
requestDataMap.delete(req);
}
logger.debug('[EditController] Cleanup completed');
};
try {
const { client } = await initializeClient({ req, res, endpointOption });
({ client } = await initializeClient({ req, res, endpointOption }));
getText = client.getStreamText != null ? client.getStreamText.bind(client) : getPartialText;
if (clientRegistry && client) {
clientRegistry.register(client, { userId }, client);
}
const getAbortData = () => ({
conversationId,
userMessagePromise,
messageId: responseMessageId,
sender,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: getText(),
userMessage,
promptTokens,
});
if (client) {
requestDataMap.set(req, { client });
}
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
clientRef = new WeakRef(client);
res.on('close', () => {
getAbortData = () => {
const currentClient = clientRef.deref();
const currentText =
currentClient?.getStreamText != null ? currentClient.getStreamText() : getPartialText();
return {
sender,
conversationId,
messageId: reqDataContext.responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: currentText,
userMessage: userMessage,
userMessagePromise: userMessagePromise,
promptTokens: reqDataContext.promptTokens,
};
};
const { onStart, abortController } = createAbortController(
req,
res,
getAbortData,
updateReqData,
);
const closeHandler = () => {
logger.debug('[EditController] Request closed');
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
if (!abortController || abortController.signal.aborted || abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[EditController] Request aborted on close');
};
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
res.removeListener('close', closeHandler);
} catch (e) {
// Ignore
}
});
let response = await client.sendMessage(text, {
user,
user: userId,
generation,
isContinued,
isEdited: true,
conversationId,
parentMessageId,
responseMessageId,
responseMessageId: reqDataContext.responseMessageId,
overrideParentMessageId,
getReqData,
getReqData: updateReqData,
onStart,
abortController,
progressCallback,
progressOptions: {
res,
// parentMessageId: overrideParentMessageId || userMessageId,
},
});
const { conversation = {} } = await client.responsePromise;
const databasePromise = response.databasePromise;
delete response.databasePromise;
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (client.options.attachments) {
if (client?.options?.attachments && endpointOption?.modelOptions?.model) {
conversation.model = endpointOption.modelOptions.model;
}
if (!abortController.signal.aborted) {
const finalUserMessage = reqDataContext.userMessage;
const finalResponseMessage = { ...response };
sendMessage(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: userMessage,
responseMessage: response,
requestMessage: finalUserMessage,
responseMessage: finalResponseMessage,
});
res.end();
await saveMessage(
req,
{ ...response, user },
{ ...finalResponseMessage, user: userId },
{ context: 'api/server/controllers/EditController.js - response end' },
);
}
performCleanup();
} catch (error) {
const partialText = getText();
logger.error('[EditController] Error handling request', error);
let partialText = '';
try {
const currentClient = clientRef.deref();
partialText =
currentClient?.getStreamText != null ? currentClient.getStreamText() : getPartialText();
} catch (getTextError) {
logger.error('[EditController] Error calling getText() during error handling', getTextError);
}
handleAbortError(res, req, error, {
sender,
partialText,
conversationId,
messageId: responseMessageId,
messageId: reqDataContext.responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
}).catch((err) => {
logger.error('[EditController] Error in `handleAbortError`', err);
});
})
.catch((err) => {
logger.error('[EditController] Error in `handleAbortError` during catch block', err);
})
.finally(() => {
performCleanup();
});
}
};

View file

@ -63,6 +63,21 @@ const noSystemModelRegex = [/\bo1\b/gi];
// const { getFormattedMemories } = require('~/models/Memory');
// const { getCurrentDateTime } = require('~/utils');
function createTokenCounter(encoding) {
return (message) => {
const countTokens = (text) => Tokenizer.getTokenCount(text, encoding);
return getTokenCountForMessage(message, countTokens);
};
}
function logToolError(graph, error, toolId) {
logger.error(
'[api/server/controllers/agents/client.js #chatCompletion] Tool Error',
error,
toolId,
);
}
class AgentClient extends BaseClient {
constructor(options = {}) {
super(null, options);
@ -535,6 +550,10 @@ class AgentClient extends BaseClient {
}
async chatCompletion({ payload, abortController = null }) {
/** @type {Partial<RunnableConfig> & { version: 'v1' | 'v2'; run_id?: string; streamMode: string }} */
let config;
/** @type {ReturnType<createRun>} */
let run;
try {
if (!abortController) {
abortController = new AbortController();
@ -632,11 +651,11 @@ class AgentClient extends BaseClient {
/** @type {TCustomConfig['endpoints']['agents']} */
const agentsEConfig = this.options.req.app.locals[EModelEndpoint.agents];
/** @type {Partial<RunnableConfig> & { version: 'v1' | 'v2'; run_id?: string; streamMode: string }} */
const config = {
config = {
configurable: {
thread_id: this.conversationId,
last_agent_index: this.agentConfigs?.size ?? 0,
user_id: this.user ?? this.options.req.user?.id,
hide_sequential_outputs: this.options.agent.hide_sequential_outputs,
},
recursionLimit: agentsEConfig?.recursionLimit,
@ -655,15 +674,6 @@ class AgentClient extends BaseClient {
initialMessages = formatContentStrings(initialMessages);
}
/** @type {ReturnType<createRun>} */
let run;
const countTokens = ((text) => this.getTokenCount(text)).bind(this);
/** @type {(message: BaseMessage) => number} */
const tokenCounter = (message) => {
return getTokenCountForMessage(message, countTokens);
};
/**
*
* @param {Agent} agent
@ -767,19 +777,14 @@ class AgentClient extends BaseClient {
run.Graph.contentData = contentData;
}
const encoding = this.getEncoding();
await run.processStream({ messages }, config, {
keepContent: i !== 0,
tokenCounter,
tokenCounter: createTokenCounter(encoding),
indexTokenCountMap: currentIndexCountMap,
maxContextTokens: agent.maxContextTokens,
callbacks: {
[Callback.TOOL_ERROR]: (graph, error, toolId) => {
logger.error(
'[api/server/controllers/agents/client.js #chatCompletion] Tool Error',
error,
toolId,
);
},
[Callback.TOOL_ERROR]: logToolError,
},
});
};
@ -809,6 +814,8 @@ class AgentClient extends BaseClient {
break;
}
}
const encoding = this.getEncoding();
const tokenCounter = createTokenCounter(encoding);
for (const [agentId, agent] of this.agentConfigs) {
if (abortController.signal.aborted === true) {
break;
@ -917,7 +924,7 @@ class AgentClient extends BaseClient {
* @param {string} params.text
* @param {string} params.conversationId
*/
async titleConvo({ text }) {
async titleConvo({ text, abortController }) {
if (!this.run) {
throw new Error('Run not initialized');
}
@ -950,6 +957,7 @@ class AgentClient extends BaseClient {
contentParts: this.contentParts,
clientOptions,
chainOptions: {
signal: abortController.signal,
callbacks: [
{
handleLLMEnd,
@ -975,7 +983,7 @@ class AgentClient extends BaseClient {
};
});
this.recordCollectedUsage({
await this.recordCollectedUsage({
model: clientOptions.model,
context: 'title',
collectedUsage,

View file

@ -1,5 +1,10 @@
const { Constants } = require('librechat-data-provider');
const { createAbortController, handleAbortError } = require('~/server/middleware');
const {
handleAbortError,
createAbortController,
cleanupAbortController,
} = require('~/server/middleware');
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
const { sendMessage } = require('~/server/utils');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');
@ -14,16 +19,22 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
} = req.body;
let sender;
let abortKey;
let userMessage;
let promptTokens;
let userMessageId;
let responseMessageId;
let userMessagePromise;
let getAbortData;
let client = null;
// Initialize as an array
let cleanupHandlers = [];
const newConvo = !conversationId;
const user = req.user.id;
const userId = req.user.id;
const getReqData = (data = {}) => {
// Create handler to avoid capturing the entire parent scope
let getReqData = (data = {}) => {
for (let key in data) {
if (key === 'userMessage') {
userMessage = data[key];
@ -36,30 +47,96 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
promptTokens = data[key];
} else if (key === 'sender') {
sender = data[key];
} else if (key === 'abortKey') {
abortKey = data[key];
} else if (!conversationId && key === 'conversationId') {
conversationId = data[key];
}
}
};
// Create a function to handle final cleanup
const performCleanup = () => {
logger.debug('[AgentController] Performing cleanup');
// Make sure cleanupHandlers is an array before iterating
if (Array.isArray(cleanupHandlers)) {
// Execute all cleanup handlers
for (const handler of cleanupHandlers) {
try {
if (typeof handler === 'function') {
handler();
}
} catch (e) {
// Ignore cleanup errors
}
}
}
// Clean up abort controller
if (abortKey) {
logger.debug('[AgentController] Cleaning up abort controller');
cleanupAbortController(abortKey);
}
// Dispose client properly
if (client) {
disposeClient(client);
}
// Clear all references
client = null;
getReqData = null;
userMessage = null;
getAbortData = null;
endpointOption.agent = null;
endpointOption = null;
cleanupHandlers = null;
userMessagePromise = null;
// Clear request data map
if (requestDataMap.has(req)) {
requestDataMap.delete(req);
}
logger.debug('[AgentController] Cleanup completed');
};
try {
/** @type {{ client: TAgentClient }} */
const { client } = await initializeClient({ req, res, endpointOption });
const result = await initializeClient({ req, res, endpointOption });
client = result.client;
const getAbortData = () => ({
sender,
userMessage,
promptTokens,
conversationId,
userMessagePromise,
messageId: responseMessageId,
content: client.getContentParts(),
parentMessageId: overrideParentMessageId ?? userMessageId,
});
// Register client with finalization registry if available
if (clientRegistry) {
clientRegistry.register(client, { userId }, client);
}
// Store request data in WeakMap keyed by req object
requestDataMap.set(req, { client });
// Use WeakRef to allow GC but still access content if it exists
const contentRef = new WeakRef(client.contentParts || []);
// Minimize closure scope - only capture small primitives and WeakRef
getAbortData = () => {
// Dereference WeakRef each time
const content = contentRef.deref();
return {
sender,
content: content || [],
userMessage,
promptTokens,
conversationId,
userMessagePromise,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
};
};
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
res.on('close', () => {
// Simple handler to avoid capturing scope
const closeHandler = () => {
logger.debug('[AgentController] Request closed');
if (!abortController) {
return;
@ -71,10 +148,19 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
abortController.abort();
logger.debug('[AgentController] Request aborted on close');
};
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
res.removeListener('close', closeHandler);
} catch (e) {
// Ignore
}
});
const messageOptions = {
user,
user: userId,
onStart,
getReqData,
conversationId,
@ -83,69 +169,103 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
overrideParentMessageId,
progressOptions: {
res,
// parentMessageId: overrideParentMessageId || userMessageId,
},
};
let response = await client.sendMessage(text, messageOptions);
response.endpoint = endpointOption.endpoint;
const { conversation = {} } = await client.responsePromise;
// Extract what we need and immediately break reference
const messageId = response.messageId;
const endpoint = endpointOption.endpoint;
response.endpoint = endpoint;
// Store database promise locally
const databasePromise = response.databasePromise;
delete response.databasePromise;
// Resolve database-related data
const { conversation: convoData = {} } = await databasePromise;
const conversation = { ...convoData };
conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat';
if (req.body.files && client.options.attachments) {
// Process files if needed
if (req.body.files && client.options?.attachments) {
userMessage.files = [];
const messageFiles = new Set(req.body.files.map((file) => file.file_id));
for (let attachment of client.options.attachments) {
if (messageFiles.has(attachment.file_id)) {
userMessage.files.push(attachment);
userMessage.files.push({ ...attachment });
}
}
delete userMessage.image_urls;
}
// Only send if not aborted
if (!abortController.signal.aborted) {
// Create a new response object with minimal copies
const finalResponse = { ...response };
sendMessage(res, {
final: true,
conversation,
title: conversation.title,
requestMessage: userMessage,
responseMessage: response,
responseMessage: finalResponse,
});
res.end();
if (!client.savedMessageIds.has(response.messageId)) {
// Save the message if needed
if (client.savedMessageIds && !client.savedMessageIds.has(messageId)) {
await saveMessage(
req,
{ ...response, user },
{ ...finalResponse, user: userId },
{ context: 'api/server/controllers/agents/request.js - response end' },
);
}
}
// Save user message if needed
if (!client.skipSaveUserMessage) {
await saveMessage(req, userMessage, {
context: 'api/server/controllers/agents/request.js - don\'t skip saving user message',
});
}
// Add title if needed - extract minimal data
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) {
addTitle(req, {
text,
response,
response: { ...response },
client,
});
})
.then(() => {
logger.debug('[AgentController] Title generation started');
})
.catch((err) => {
logger.error('[AgentController] Error in title generation', err);
})
.finally(() => {
logger.debug('[AgentController] Title generation completed');
performCleanup();
});
} else {
performCleanup();
}
} catch (error) {
// Handle error without capturing much scope
handleAbortError(res, req, error, {
conversationId,
sender,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
}).catch((err) => {
logger.error('[api/server/controllers/agents/request] Error in `handleAbortError`', err);
});
})
.catch((err) => {
logger.error('[api/server/controllers/agents/request] Error in `handleAbortError`', err);
})
.finally(() => {
performCleanup();
});
}
};