🚦 feat: Auto-reinitialize MCP Servers on Request (#9226)

This commit is contained in:
Danny Avila 2025-08-23 03:27:05 -04:00
parent ac608ded46
commit c827fdd10e
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
28 changed files with 871 additions and 312 deletions

View file

@ -1,7 +1,12 @@
const { z } = require('zod');
const { tool } = require('@langchain/core/tools');
const { logger } = require('@librechat/data-schemas');
const { Constants: AgentConstants, Providers, GraphEvents } = require('@librechat/agents');
const {
Providers,
StepTypes,
GraphEvents,
Constants: AgentConstants,
} = require('@librechat/agents');
const {
sendEvent,
MCPOAuthHandler,
@ -11,14 +16,14 @@ const {
const {
Time,
CacheKeys,
StepTypes,
Constants,
ContentTypes,
isAssistantsEndpoint,
} = require('librechat-data-provider');
const { getCachedTools, loadCustomConfig } = require('./Config');
const { findToken, createToken, updateToken } = require('~/models');
const { getMCPManager, getFlowStateManager } = require('~/config');
const { getCachedTools, loadCustomConfig } = require('./Config');
const { reinitMCPServer } = require('./Tools/mcp');
const { getLogStores } = require('~/cache');
/**
@ -26,16 +31,13 @@ const { getLogStores } = require('~/cache');
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.stepId - The ID of the step in the flow.
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {string} params.loginFlowId - The ID of the login flow.
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
*/
function createOAuthStart({ res, stepId, toolCall, loginFlowId, flowManager, signal }) {
function createRunStepDeltaEmitter({ res, stepId, toolCall }) {
/**
* Creates a function to handle OAuth login requests.
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
* @returns {void}
*/
return async function (authURL) {
return function (authURL) {
/** @type {{ id: string; delta: AgentToolCallDelta }} */
const data = {
id: stepId,
@ -46,17 +48,54 @@ function createOAuthStart({ res, stepId, toolCall, loginFlowId, flowManager, sig
expires_at: Date.now() + Time.TWO_MINUTES,
},
};
/** Used to ensure the handler (use of `sendEvent`) is only invoked once */
await flowManager.createFlowWithHandler(
loginFlowId,
'oauth_login',
async () => {
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
logger.debug('Sent OAuth login request to client');
return true;
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
};
}
/**
* @param {object} params
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.runId - The Run ID, i.e. message ID
* @param {string} params.stepId - The ID of the step in the flow.
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {number} [params.index]
*/
function createRunStepEmitter({ res, runId, stepId, toolCall, index }) {
return function () {
/** @type {import('@librechat/agents').RunStep} */
const data = {
runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID,
id: stepId,
type: StepTypes.TOOL_CALLS,
index: index ?? 0,
stepDetails: {
type: StepTypes.TOOL_CALLS,
tool_calls: [toolCall],
},
signal,
);
};
sendEvent(res, { event: GraphEvents.ON_RUN_STEP, data });
};
}
/**
* Creates a function used to ensure the flow handler is only invoked once
* @param {object} params
* @param {string} params.flowId - The ID of the login flow.
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
* @param {(authURL: string) => void} [params.callback]
*/
function createOAuthStart({ flowId, flowManager, callback }) {
/**
* Creates a function to handle OAuth login requests.
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
*/
return async function (authURL) {
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
callback?.(authURL);
logger.debug('Sent OAuth login request to client');
return true;
});
};
}
@ -99,23 +138,166 @@ function createAbortHandler({ userId, serverName, toolName, flowManager }) {
}
/**
* Creates a general tool for an entire action set.
* @param {Object} params
* @param {() => void} params.runStepEmitter
* @param {(authURL: string) => void} params.runStepDeltaEmitter
* @returns {(authURL: string) => void}
*/
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
return function (authURL) {
runStepEmitter();
runStepDeltaEmitter(authURL);
};
}
/**
* @param {Object} params
* @param {ServerRequest} params.req - The Express request object, containing user/request info.
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.serverName
* @param {AbortSignal} params.signal
* @param {string} params.model
* @param {number} [params.index]
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
*/
async function reconnectServer({ req, res, index, signal, serverName, userMCPAuthMap }) {
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
const flowId = `${req.user?.id}:${serverName}:${Date.now()}`;
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
const stepId = 'step_oauth_login_' + serverName;
const toolCall = {
id: flowId,
name: serverName,
type: 'tool_call_chunk',
};
const runStepEmitter = createRunStepEmitter({
res,
index,
runId,
stepId,
toolCall,
});
const runStepDeltaEmitter = createRunStepDeltaEmitter({
res,
stepId,
toolCall,
});
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
const oauthStart = createOAuthStart({
res,
flowId,
callback,
flowManager,
});
return await reinitMCPServer({
req,
signal,
serverName,
oauthStart,
flowManager,
userMCPAuthMap,
forceNew: true,
returnOnOAuth: false,
connectionTimeout: Time.TWO_MINUTES,
});
}
/**
* Creates all tools from the specified MCP Server via `toolKey`.
*
* @param {Object} params - The parameters for loading action sets.
* This function assumes tools could not be aggregated from the cache of tool definitions,
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
*
* @param {Object} params
* @param {ServerRequest} params.req - The Express request object, containing user/request info.
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.serverName
* @param {string} params.model
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
* @param {number} [params.index]
* @param {AbortSignal} [params.signal]
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
*/
async function createMCPTools({ req, res, index, signal, serverName, provider, userMCPAuthMap }) {
const result = await reconnectServer({ req, res, index, signal, serverName, userMCPAuthMap });
if (!result || !result.tools) {
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
return;
}
const serverTools = [];
for (const tool of result.tools) {
const toolInstance = await createMCPTool({
req,
res,
provider,
userMCPAuthMap,
availableTools: result.availableTools,
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
});
if (toolInstance) {
serverTools.push(toolInstance);
}
}
return serverTools;
}
/**
* Creates a single tool from the specified MCP Server via `toolKey`.
* @param {Object} params
* @param {ServerRequest} params.req - The Express request object, containing user/request info.
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.toolKey - The toolKey for the tool.
* @param {import('@librechat/agents').Providers | EModelEndpoint} params.provider - The provider for the tool.
* @param {string} params.model - The model for the tool.
* @param {number} [params.index]
* @param {AbortSignal} [params.signal]
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
* @param {LCAvailableTools} [params.availableTools]
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
*/
async function createMCPTool({ req, res, toolKey, provider: _provider }) {
const availableTools = await getCachedTools({ userId: req.user?.id, includeGlobal: true });
const toolDefinition = availableTools?.[toolKey]?.function;
async function createMCPTool({
req,
res,
index,
signal,
toolKey,
provider,
userMCPAuthMap,
availableTools: tools,
}) {
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
const availableTools =
tools ?? (await getCachedTools({ userId: req.user?.id, includeGlobal: true }));
/** @type {LCTool | undefined} */
let toolDefinition = availableTools?.[toolKey]?.function;
if (!toolDefinition) {
logger.error(`Tool ${toolKey} not found in available tools`);
return null;
logger.warn(
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
);
const result = await reconnectServer({ req, res, index, signal, serverName, userMCPAuthMap });
toolDefinition = result?.availableTools?.[toolKey]?.function;
}
if (!toolDefinition) {
logger.warn(`[MCP][${serverName}][${toolName}] Tool definition not found, cannot create tool.`);
return;
}
return createToolInstance({
res,
provider,
toolName,
serverName,
toolDefinition,
});
}
function createToolInstance({ res, toolName, serverName, toolDefinition, provider: _provider }) {
/** @type {LCTool} */
const { description, parameters } = toolDefinition;
const isGoogle = _provider === Providers.VERTEXAI || _provider === Providers.GOOGLE;
@ -128,16 +310,8 @@ async function createMCPTool({ req, res, toolKey, provider: _provider }) {
schema = z.object({ input: z.string().optional() });
}
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
if (!req.user?.id) {
logger.error(
`[MCP][${serverName}][${toolName}] User ID not found on request. Cannot create tool.`,
);
throw new Error(`User ID not found on request. Cannot create tool for ${toolKey}.`);
}
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
const _call = async (toolArguments, config) => {
const userId = config?.configurable?.user?.id || config?.configurable?.user_id;
@ -154,14 +328,16 @@ async function createMCPTool({ req, res, toolKey, provider: _provider }) {
const provider = (config?.metadata?.provider || _provider)?.toLowerCase();
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
const loginFlowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
const oauthStart = createOAuthStart({
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
const runStepDeltaEmitter = createRunStepDeltaEmitter({
res,
stepId,
toolCall,
loginFlowId,
});
const oauthStart = createOAuthStart({
flowId,
flowManager,
signal: derivedSignal,
callback: runStepDeltaEmitter,
});
const oauthEnd = createOAuthEnd({
res,
@ -207,7 +383,7 @@ async function createMCPTool({ req, res, toolKey, provider: _provider }) {
return result;
} catch (error) {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error calling "${toolName}" MCP tool:`,
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
error,
);
@ -220,12 +396,12 @@ async function createMCPTool({ req, res, toolKey, provider: _provider }) {
if (isOAuthError) {
throw new Error(
`OAuth authentication required for ${serverName}. Please check the server logs for the authentication URL.`,
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
);
}
throw new Error(
`"${toolKey}" tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
);
} finally {
// Clean up abort handler to prevent memory leaks
@ -380,6 +556,7 @@ async function getServerConnectionStatus(
module.exports = {
createMCPTool,
createMCPTools,
getMCPSetupData,
checkOAuthFlowStatus,
getServerConnectionStatus,