mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-04-03 06:17:21 +02:00
* fix(api): add buildOAuthToolCallName utility for MCP OAuth flows
Extract a shared utility that builds the synthetic tool-call name
used during MCP OAuth flows (oauth_mcp_{normalizedServerName}).
Uses startsWith on the raw serverName (not the normalized form) to
guard against double-wrapping, so names that merely normalize to
start with oauth_mcp_ (e.g., oauth@mcp@server) are correctly
prefixed while genuinely pre-wrapped names are left as-is.
Add 8 unit tests covering normal names, pre-wrapped names, _mcp_
substrings, special characters, non-ASCII, and empty string inputs.
* fix(backend): use buildOAuthToolCallName in MCP OAuth flows
Replace inline tool-call name construction in both reconnectServer
(MCP.js) and createOAuthEmitter (ToolService.js) with the shared
buildOAuthToolCallName utility. Remove unused normalizeServerName
import from ToolService.js. Fix import ordering in both files.
This ensures the oauth_mcp_ prefix is consistently applied so the
client correctly identifies MCP OAuth flows and binds the CSRF
cookie to the right server.
* fix(client): robust MCP OAuth detection and split handling in ToolCall
- Fix split() destructuring to preserve tail segments for server names
containing _mcp_ (e.g., foo_mcp_bar no longer truncated to foo).
- Add auth URL redirect_uri fallback: when the tool-call name lacks
the _mcp_ delimiter, parse redirect_uri for the MCP callback path.
Set function_name to the extracted server name so progress text
shows the server, not the raw tool-call ID.
- Display server name instead of literal "oauth" as function_name,
gated on auth presence to avoid misidentifying real tools named
"oauth".
- Consolidate three independent new URL(auth) parses into a single
parsedAuthUrl useMemo shared across detection, actionId, and
authDomain hooks.
- Replace any type on ProgressText test mock with structural type.
- Add 8 tests covering delimiter detection, multi-segment names,
function_name display, redirect_uri fallback, normalized _mcp_
server names, and non-MCP action auth exclusion.
* chore: fix import order in utils.test.ts
* fix(client): drop auth gate on OAuth displayName so completed flows show server name
The createOAuthEnd handler re-emits the toolCall delta without auth,
so auth is cleared on the client after OAuth completes. Gating
displayName on `func === 'oauth' && auth` caused completed OAuth
steps to render "Completed oauth" instead of "Completed my-server".
Remove the `&& auth` gate — within the MCP delimiter branch the
func="oauth" check alone is sufficient. Also remove `auth` from the
useMemo dep array since only `parsedAuthUrl` is referenced. Update
the test to assert correct post-completion display.
795 lines
26 KiB
JavaScript
795 lines
26 KiB
JavaScript
const { tool } = require('@langchain/core/tools');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
Providers,
|
|
StepTypes,
|
|
GraphEvents,
|
|
Constants: AgentConstants,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
MCPOAuthHandler,
|
|
isMCPDomainAllowed,
|
|
normalizeServerName,
|
|
normalizeJsonSchema,
|
|
GenerationJobManager,
|
|
resolveJsonSchemaRefs,
|
|
buildOAuthToolCallName,
|
|
} = require('@librechat/api');
|
|
const { Time, CacheKeys, Constants, isAssistantsEndpoint } = require('librechat-data-provider');
|
|
const {
|
|
getOAuthReconnectionManager,
|
|
getMCPServersRegistry,
|
|
getFlowStateManager,
|
|
getMCPManager,
|
|
} = require('~/config');
|
|
const { findToken, createToken, updateToken } = require('~/models');
|
|
const { getGraphApiToken } = require('./GraphTokenService');
|
|
const { reinitMCPServer } = require('./Tools/mcp');
|
|
const { getAppConfig } = require('./Config');
|
|
const { getLogStores } = require('~/cache');
|
|
|
|
const MAX_CACHE_SIZE = 1000;
|
|
const lastReconnectAttempts = new Map();
|
|
const RECONNECT_THROTTLE_MS = 10_000;
|
|
|
|
const missingToolCache = new Map();
|
|
const MISSING_TOOL_TTL_MS = 10_000;
|
|
|
|
function evictStale(map, ttl) {
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
for (const [key, timestamp] of map) {
|
|
if (now - timestamp >= ttl) {
|
|
map.delete(key);
|
|
}
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
const unavailableMsg =
|
|
"This tool's MCP server is temporarily unavailable. Please try again shortly.";
|
|
|
|
/**
|
|
* @param {string} toolName
|
|
* @param {string} serverName
|
|
*/
|
|
function createUnavailableToolStub(toolName, serverName) {
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
const _call = async () => [unavailableMsg, null];
|
|
const toolInstance = tool(_call, {
|
|
schema: {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
},
|
|
name: normalizedToolKey,
|
|
description: unavailableMsg,
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
return toolInstance;
|
|
}
|
|
|
|
function isEmptyObjectSchema(jsonSchema) {
|
|
return (
|
|
jsonSchema != null &&
|
|
typeof jsonSchema === 'object' &&
|
|
jsonSchema.type === 'object' &&
|
|
(jsonSchema.properties == null || Object.keys(jsonSchema.properties).length === 0) &&
|
|
!jsonSchema.additionalProperties
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
|
|
/**
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<void>}
|
|
*/
|
|
return async function (authURL) {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall, args: '' }],
|
|
auth: authURL,
|
|
expires_at: Date.now() + Time.TWO_MINUTES,
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.runId - The Run ID, i.e. message ID
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @returns {() => Promise<void>}
|
|
*/
|
|
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
|
|
return async function () {
|
|
/** @type {import('@librechat/agents').RunStep} */
|
|
const data = {
|
|
runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID,
|
|
id: stepId,
|
|
type: StepTypes.TOOL_CALLS,
|
|
index: index ?? 0,
|
|
stepDetails: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [toolCall],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a function used to ensure the flow handler is only invoked once
|
|
* @param {object} params
|
|
* @param {string} params.flowId - The ID of the login flow.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
* @param {(authURL: string) => void} [params.callback]
|
|
*/
|
|
function createOAuthStart({ flowId, flowManager, callback }) {
|
|
/**
|
|
* Creates a function to handle OAuth login requests.
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
|
|
*/
|
|
return async function (authURL) {
|
|
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
|
|
callback?.(authURL);
|
|
logger.debug('Sent OAuth login request to client');
|
|
return true;
|
|
});
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
|
|
return async function () {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall }],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
logger.debug('Sent OAuth login success to client');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string} params.userId - The ID of the user.
|
|
* @param {string} params.serverName - The name of the server.
|
|
* @param {string} params.toolName - The name of the tool.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
*/
|
|
function createAbortHandler({ userId, serverName, toolName, flowManager }) {
|
|
return function () {
|
|
logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted'));
|
|
flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted'));
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {() => void} params.runStepEmitter
|
|
* @param {(authURL: string) => void} params.runStepDeltaEmitter
|
|
* @returns {(authURL: string) => void}
|
|
*/
|
|
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
|
|
return function (authURL) {
|
|
runStepEmitter();
|
|
runStepDeltaEmitter(authURL);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {AbortSignal} params.signal
|
|
* @param {string} params.model
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
logger.debug(
|
|
`[MCP][reconnectServer] serverName: ${serverName}, user: ${user?.id}, hasUserMCPAuthMap: ${!!userMCPAuthMap}`,
|
|
);
|
|
|
|
const throttleKey = `${user.id}:${serverName}`;
|
|
const now = Date.now();
|
|
const lastAttempt = lastReconnectAttempts.get(throttleKey) ?? 0;
|
|
if (now - lastAttempt < RECONNECT_THROTTLE_MS) {
|
|
logger.debug(`[MCP][reconnectServer] Throttled reconnect for ${serverName}`);
|
|
return null;
|
|
}
|
|
lastReconnectAttempts.set(throttleKey, now);
|
|
evictStale(lastReconnectAttempts, RECONNECT_THROTTLE_MS);
|
|
|
|
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
|
|
const flowId = `${user.id}:${serverName}:${Date.now()}`;
|
|
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
|
const stepId = 'step_oauth_login_' + serverName;
|
|
const toolCall = {
|
|
id: flowId,
|
|
name: buildOAuthToolCallName(serverName),
|
|
type: 'tool_call_chunk',
|
|
};
|
|
|
|
// Set up abort handler to clean up OAuth flows if request is aborted
|
|
const oauthFlowId = MCPOAuthHandler.generateFlowId(user.id, serverName);
|
|
const abortHandler = () => {
|
|
logger.info(
|
|
`[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`,
|
|
);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted'));
|
|
flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted'));
|
|
};
|
|
|
|
if (signal) {
|
|
signal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
try {
|
|
const runStepEmitter = createRunStepEmitter({
|
|
res,
|
|
index,
|
|
runId,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
|
|
const oauthStart = createOAuthStart({
|
|
res,
|
|
flowId,
|
|
callback,
|
|
flowManager,
|
|
});
|
|
return await reinitMCPServer({
|
|
user,
|
|
signal,
|
|
serverName,
|
|
oauthStart,
|
|
flowManager,
|
|
userMCPAuthMap,
|
|
forceNew: true,
|
|
returnOnOAuth: false,
|
|
connectionTimeout: Time.THIRTY_SECONDS,
|
|
});
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (signal) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates all tools from the specified MCP Server via `toolKey`.
|
|
*
|
|
* This function assumes tools could not be aggregated from the cache of tool definitions,
|
|
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {string} params.model
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTools({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
config,
|
|
provider,
|
|
serverName,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
// Early domain validation before reconnecting server (avoid wasted work on disallowed domains)
|
|
// Use getAppConfig() to support per-user/role domain restrictions
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({ role: user?.role });
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const isDomainAllowed = await isMCPDomainAllowed(serverConfig, allowedDomains);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain not allowed, skipping all tools`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
if (result === null) {
|
|
logger.debug(`[MCP][${serverName}] Reconnect throttled, skipping tool creation.`);
|
|
return [];
|
|
}
|
|
if (!result || !result.tools) {
|
|
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
|
|
return [];
|
|
}
|
|
|
|
const serverTools = [];
|
|
for (const tool of result.tools) {
|
|
const toolInstance = await createMCPTool({
|
|
res,
|
|
user,
|
|
provider,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
availableTools: result.availableTools,
|
|
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
|
|
config: serverConfig,
|
|
});
|
|
if (toolInstance) {
|
|
serverTools.push(toolInstance);
|
|
}
|
|
}
|
|
|
|
return serverTools;
|
|
}
|
|
|
|
/**
|
|
* Creates a single tool from the specified MCP Server via `toolKey`.
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.toolKey - The toolKey for the tool.
|
|
* @param {string} params.model - The model for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {LCAvailableTools} [params.availableTools]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTool({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
toolKey,
|
|
provider,
|
|
userMCPAuthMap,
|
|
availableTools,
|
|
config,
|
|
streamId = null,
|
|
}) {
|
|
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
|
|
|
|
// Runtime domain validation: check if the server's domain is still allowed
|
|
// Use getAppConfig() to support per-user/role domain restrictions
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({ role: user?.role });
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const isDomainAllowed = await isMCPDomainAllowed(serverConfig, allowedDomains);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain no longer allowed, skipping tool: ${toolName}`);
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/** @type {LCTool | undefined} */
|
|
let toolDefinition = availableTools?.[toolKey]?.function;
|
|
if (!toolDefinition) {
|
|
const cachedAt = missingToolCache.get(toolKey);
|
|
if (cachedAt && Date.now() - cachedAt < MISSING_TOOL_TTL_MS) {
|
|
logger.debug(
|
|
`[MCP][${serverName}][${toolName}] Tool in negative cache, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
|
|
);
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
toolDefinition = result?.availableTools?.[toolKey]?.function;
|
|
|
|
if (!toolDefinition) {
|
|
missingToolCache.set(toolKey, Date.now());
|
|
evictStale(missingToolCache, MISSING_TOOL_TTL_MS);
|
|
}
|
|
}
|
|
|
|
if (!toolDefinition) {
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Tool definition not found, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
return createToolInstance({
|
|
res,
|
|
provider,
|
|
toolName,
|
|
serverName,
|
|
toolDefinition,
|
|
streamId,
|
|
});
|
|
}
|
|
|
|
function createToolInstance({
|
|
res,
|
|
toolName,
|
|
serverName,
|
|
toolDefinition,
|
|
provider: _provider,
|
|
streamId = null,
|
|
}) {
|
|
/** @type {LCTool} */
|
|
const { description, parameters } = toolDefinition;
|
|
const isGoogle = _provider === Providers.VERTEXAI || _provider === Providers.GOOGLE;
|
|
|
|
let schema = parameters ? normalizeJsonSchema(resolveJsonSchemaRefs(parameters)) : null;
|
|
|
|
if (!schema || (isGoogle && isEmptyObjectSchema(schema))) {
|
|
schema = {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
};
|
|
}
|
|
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
|
|
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
|
|
const _call = async (toolArguments, config) => {
|
|
const userId = config?.configurable?.user?.id || config?.configurable?.user_id;
|
|
/** @type {ReturnType<typeof createAbortHandler>} */
|
|
let abortHandler = null;
|
|
/** @type {AbortSignal} */
|
|
let derivedSignal = null;
|
|
|
|
try {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined;
|
|
const mcpManager = getMCPManager(userId);
|
|
const provider = (config?.metadata?.provider || _provider)?.toLowerCase();
|
|
|
|
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
|
|
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const oauthStart = createOAuthStart({
|
|
flowId,
|
|
flowManager,
|
|
callback: runStepDeltaEmitter,
|
|
});
|
|
const oauthEnd = createOAuthEnd({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
|
|
if (derivedSignal) {
|
|
abortHandler = createAbortHandler({ userId, serverName, toolName, flowManager });
|
|
derivedSignal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
const customUserVars =
|
|
config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
|
|
const result = await mcpManager.callTool({
|
|
serverName,
|
|
toolName,
|
|
provider,
|
|
toolArguments,
|
|
options: {
|
|
signal: derivedSignal,
|
|
},
|
|
user: config?.configurable?.user,
|
|
requestBody: config?.configurable?.requestBody,
|
|
customUserVars,
|
|
flowManager,
|
|
tokenMethods: {
|
|
findToken,
|
|
createToken,
|
|
updateToken,
|
|
},
|
|
oauthStart,
|
|
oauthEnd,
|
|
graphTokenResolver: getGraphApiToken,
|
|
});
|
|
|
|
if (isAssistantsEndpoint(provider) && Array.isArray(result)) {
|
|
return result[0];
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
|
|
error,
|
|
);
|
|
|
|
/** OAuth error, provide a helpful message */
|
|
const isOAuthError =
|
|
error.message?.includes('401') ||
|
|
error.message?.includes('OAuth') ||
|
|
error.message?.includes('authentication') ||
|
|
error.message?.includes('Non-200 status code (401)');
|
|
|
|
if (isOAuthError) {
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
|
|
);
|
|
}
|
|
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
|
|
);
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (abortHandler && derivedSignal) {
|
|
derivedSignal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
};
|
|
|
|
const toolInstance = tool(_call, {
|
|
schema,
|
|
name: normalizedToolKey,
|
|
description: description || '',
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
toolInstance.mcpJsonSchema = parameters;
|
|
return toolInstance;
|
|
}
|
|
|
|
/**
|
|
* Get MCP setup data including config, connections, and OAuth servers
|
|
* @param {string} userId - The user ID
|
|
* @returns {Object} Object containing mcpConfig, appConnections, userConnections, and oauthServers
|
|
*/
|
|
async function getMCPSetupData(userId) {
|
|
const mcpConfig = await getMCPServersRegistry().getAllServerConfigs(userId);
|
|
|
|
if (!mcpConfig) {
|
|
throw new Error('MCP config not found');
|
|
}
|
|
|
|
const mcpManager = getMCPManager(userId);
|
|
/** @type {Map<string, import('@librechat/api').MCPConnection>} */
|
|
let appConnections = new Map();
|
|
try {
|
|
// Use getLoaded() instead of getAll() to avoid forcing connection creation
|
|
// getAll() creates connections for all servers, which is problematic for servers
|
|
// that require user context (e.g., those with {{LIBRECHAT_USER_ID}} placeholders)
|
|
appConnections = (await mcpManager.appConnections?.getLoaded()) || new Map();
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
|
|
}
|
|
const userConnections = mcpManager.getUserConnections(userId) || new Map();
|
|
const oauthServers = await getMCPServersRegistry().getOAuthServers(userId);
|
|
|
|
return {
|
|
mcpConfig,
|
|
oauthServers,
|
|
appConnections,
|
|
userConnections,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check OAuth flow status for a user and server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @returns {Object} Object containing hasActiveFlow and hasFailedFlow flags
|
|
*/
|
|
async function checkOAuthFlowStatus(userId, serverName) {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
|
|
try {
|
|
const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth');
|
|
if (!flowState) {
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
|
|
const flowAge = Date.now() - flowState.createdAt;
|
|
const flowTTL = flowState.ttl || 180000; // Default 3 minutes
|
|
|
|
if (flowState.status === 'FAILED' || flowAge > flowTTL) {
|
|
const wasCancelled = flowState.error && flowState.error.includes('cancelled');
|
|
|
|
if (wasCancelled) {
|
|
logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} else {
|
|
logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
flowAge,
|
|
flowTTL,
|
|
timedOut: flowAge > flowTTL,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: true };
|
|
}
|
|
}
|
|
|
|
if (flowState.status === 'PENDING') {
|
|
logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
flowAge,
|
|
flowTTL,
|
|
});
|
|
return { hasActiveFlow: true, hasFailedFlow: false };
|
|
}
|
|
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} catch (error) {
|
|
logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error);
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection status for a specific MCP server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {import('@librechat/api').ParsedServerConfig} config - The server configuration
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} appConnections - App-level connections
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} userConnections - User-level connections
|
|
* @param {Set} oauthServers - Set of OAuth servers
|
|
* @returns {Object} Object containing requiresOAuth and connectionState
|
|
*/
|
|
async function getServerConnectionStatus(
|
|
userId,
|
|
serverName,
|
|
config,
|
|
appConnections,
|
|
userConnections,
|
|
oauthServers,
|
|
) {
|
|
const connection = appConnections.get(serverName) || userConnections.get(serverName);
|
|
const isStaleOrDoNotExist = connection ? connection?.isStale(config.updatedAt) : true;
|
|
|
|
const baseConnectionState = isStaleOrDoNotExist
|
|
? 'disconnected'
|
|
: connection?.connectionState || 'disconnected';
|
|
let finalConnectionState = baseConnectionState;
|
|
|
|
// connection state overrides specific to OAuth servers
|
|
if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) {
|
|
// check if server is actively being reconnected
|
|
const oauthReconnectionManager = getOAuthReconnectionManager();
|
|
if (oauthReconnectionManager.isReconnecting(userId, serverName)) {
|
|
finalConnectionState = 'connecting';
|
|
} else {
|
|
const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName);
|
|
|
|
if (hasFailedFlow) {
|
|
finalConnectionState = 'error';
|
|
} else if (hasActiveFlow) {
|
|
finalConnectionState = 'connecting';
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
requiresOAuth: oauthServers.has(serverName),
|
|
connectionState: finalConnectionState,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createMCPTool,
|
|
createMCPTools,
|
|
getMCPSetupData,
|
|
checkOAuthFlowStatus,
|
|
getServerConnectionStatus,
|
|
createUnavailableToolStub,
|
|
};
|