mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-04-05 15:27:20 +02:00
Some checks failed
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Has been cancelled
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Has been cancelled
Publish `librechat-data-provider` to NPM / build (push) Has been cancelled
Publish `@librechat/data-schemas` to NPM / build-and-publish (push) Has been cancelled
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Has been cancelled
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Has been cancelled
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Has been cancelled
Publish `librechat-data-provider` to NPM / publish-npm (push) Has been cancelled
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Has been cancelled
* fix: reuse existing OAuth client registrations to prevent client_id mismatch
When using auto-discovered OAuth (DCR), LibreChat calls /register on every
flow initiation, getting a new client_id each time. When concurrent
connections or reconnections happen, the client_id used during /authorize
differs from the one used during /token, causing the server to reject the
exchange.
Before registering a new client, check if a valid client registration
already exists in the database and reuse it.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* Handle re-registration of OAuth clients when redirect_uri changes
* Add undefined fields for logo_uri and tos_uri in OAuth metadata tests
* test: add client registration reuse tests for horizontal scaling race condition
Reproduces the client_id mismatch bug that occurs in multi-replica deployments
where concurrent initiateOAuthFlow calls each register a new OAuth client.
Tests verify that the findToken-based client reuse prevents re-registration.
* fix: address review findings for client registration reuse
- Fix empty redirect_uris bug: invert condition so missing/empty
redirect_uris triggers re-registration instead of silent reuse
- Revert undocumented config?.redirect_uri in auto-discovery path
- Change DB error logging from debug to warn for operator visibility
- Fix import order: move package type import to correct section
- Remove redundant type cast and misleading JSDoc comment
- Test file: remove dead imports, restore process.env.DOMAIN_SERVER,
rename describe blocks, add empty redirect_uris edge case test,
add concurrent reconnection test with pre-seeded token,
scope documentation to reconnection stabilization
* fix: resolve type check errors for OAuthClientInformation redirect_uris
The SDK's OAuthClientInformation type lacks redirect_uris (only on
OAuthClientInformationFull). Cast to the local OAuthClientInformation
type in handler.ts when accessing deserialized client info from DB,
and use intersection types in tests for clientInfo with redirect_uris.
* fix: address follow-up review findings R1, R2, R3
- R1: Move `import type { TokenMethods }` to the type-imports section,
before local types, per CLAUDE.md import order rules
- R2: Add unit test for empty redirect_uris in handler.test.ts to
verify the inverted condition triggers re-registration
- R3: Use delete for process.env.DOMAIN_SERVER restoration when the
original value was undefined to avoid coercion to string "undefined"
* fix: clear stale client registration on OAuth flow failure
When a stored client_id is no longer recognized by the OAuth server,
the flow fails but the stale client stays in MongoDB, causing every
retry to reuse the same invalid registration in an infinite loop.
On OAuth failure, clear the stored client registration so the next
attempt falls through to fresh Dynamic Client Registration.
- Add MCPTokenStorage.deleteClientRegistration() for targeted cleanup
- Call it from MCPConnectionFactory's OAuth failure path
- Add integration test proving recovery from stale client reuse
* fix: validate auth server identity and target cleanup to reused clients
- Gate client reuse on authorization server identity: compare stored
issuer against freshly discovered metadata before reusing, preventing
wrong-client reuse when the MCP server switches auth providers
- Add reusedStoredClient flag to MCPOAuthFlowMetadata so cleanup only
runs when the failed flow actually reused a stored registration,
not on unrelated failures (timeouts, user-denied consent, etc.)
- Add cleanup in returnOnOAuth path: when a prior flow that reused a
stored client is detected as failed, clear the stale registration
before re-initiating
- Add tests for issuer mismatch and reusedStoredClient flag assertions
* fix: address minor review findings N3, N5, N6
- N3: Type deleteClientRegistration param as TokenMethods['deleteTokens']
instead of Promise<unknown>
- N5: Elevate deletion failure logging from debug to warn for operator
visibility when stale client cleanup fails
- N6: Use getLogPrefix() instead of hardcoded log prefix to respect
system-user privacy convention
* fix: correct stale-client cleanup in both OAuth paths
- Blocking path: remove result?.clientInfo guard that made cleanup
unreachable (handleOAuthRequired returns null on failure, so
result?.clientInfo was always false in the failure branch)
- returnOnOAuth path: only clear stored client when the prior flow
status is FAILED, not on COMPLETED or PENDING flows, to avoid
deleting valid registrations during normal flow replacement
* fix: remove redundant cast on clientMetadata
clientMetadata is already typed as Record<string, unknown>; the
as Record<string, unknown> cast was a no-op.
* fix: thread reusedStoredClient through return type instead of re-reading flow state
FlowStateManager.createFlow() deletes FAILED flow state before
rejecting, so getFlowState() after handleOAuthRequired() returns null
would find nothing — making the stale-client cleanup dead code.
Fix: hoist reusedStoredClient flag from flowMetadata into a local
variable, include it in handleOAuthRequired()'s return type (both
success and catch paths), and use result.reusedStoredClient directly
in the caller instead of a second getFlowState() round-trip.
* fix: selective stale-client cleanup in returnOnOAuth path
The returnOnOAuth cleanup was unreliable: it depended on reading
FAILED flow state, but FlowStateManager.monitorFlow() deletes FAILED
state before rejecting. Move cleanup into createFlow's catch handler
where flowMetadata.reusedStoredClient is still in scope.
Make cleanup selective in both paths: add isClientRejection() helper
that only matches errors indicating the OAuth server rejected the
client_id (invalid_client, unauthorized_client, client not found).
Timeouts, user-cancelled flows, and other transient failures no
longer wipe valid stored registrations.
Thread the error from handleOAuthRequired() through the return type
so the blocking path can also check isClientRejection().
* fix: tighten isClientRejection heuristic
Narrow 'client_id' match to 'client_id mismatch' to avoid
false-positive cleanup on unrelated errors that happen to
mention client_id.
* test: add isClientRejection tests and enforced client_id on test server
- Add isClientRejection unit tests: invalid_client, unauthorized_client,
client_id mismatch, client not found, unknown client, and negative
cases (timeout, flow state not found, user denied, null, undefined)
- Enhance OAuth test server with enforceClientId option: binds auth
codes to the client_id that initiated /authorize, rejects token
exchange with mismatched or unregistered client_id (401 invalid_client)
- Add integration tests proving the test server correctly rejects
stale client_ids and accepts matching ones at /token
* fix: issuer validation, callback error propagation, and cleanup DRY
- Issuer check: re-register when storedIssuer is absent or non-string
instead of silently reusing. Narrows unknown type with typeof guard
and inverts condition so missing issuer → fresh DCR (safer default).
- OAuth callback route: call failFlow with the OAuth error when the
authorization server redirects back with error= parameter, so the
waiting flow receives the actual rejection instead of timing out.
This lets isClientRejection match stale-client errors correctly.
- Extract duplicated cleanup block to clearStaleClientIfRejected()
private method, called from both returnOnOAuth and blocking paths.
- Test fixes: add issuer to stored metadata in reuse tests, reset
server to undefined in afterEach to prevent double-close.
* fix: gate failFlow behind callback validation, propagate reusedStoredClient on join
- OAuth callback: move failFlow call to after CSRF/session/active-flow
validation so an attacker with only a leaked state parameter cannot
force-fail a flow without passing the same integrity checks required
for legitimate callbacks
- PENDING join path: propagate reusedStoredClient from flow metadata
into the return object so joiners can trigger stale-client cleanup
if the joined flow later fails with a client rejection
* fix: restore early oauthError/code redirects, gate only failFlow behind CSRF
The previous restructuring moved oauthError and missing-code checks
behind CSRF validation, breaking tests that expect those redirects
without cookies. The redirect itself is harmless (just shows an error
page). Only the failFlow call needs CSRF gating to prevent DoS.
Restructure: oauthError check stays early (redirects immediately),
but failFlow inside it runs the full CSRF/session/active-flow
validation before marking the flow as FAILED.
* fix: require deleteTokens for client reuse, add missing import in MCP.js
Client registration reuse without cleanup capability creates a
permanent failure loop: if the reused client is stale, the code
detects the rejection but cannot clear the stored registration
because deleteTokens is missing, so every retry reuses the same
broken client_id.
- MCPConnectionFactory: only pass findToken to initiateOAuthFlow
when deleteTokens is also available, ensuring reuse is only
enabled when recovery is possible
- api/server/services/MCP.js: add deleteTokens to the tokenMethods
object (was the only MCP call site missing it)
* fix: set reusedStoredClient before createFlow in joined-flow path
When joining a PENDING flow, reusedStoredClient was only set on the
success return but not before the await. If createFlow throws (e.g.
invalid_client during token exchange), the outer catch returns the
local variable which was still false, skipping stale-client cleanup.
* fix: require browser binding (CSRF/session) for failFlow on OAuth error
hasActiveFlow only proves a PENDING flow exists, not that the caller
is the same browser that initiated it. An attacker with a leaked state
could force-fail the flow without any user binding. Require hasCsrf or
hasSession before calling failFlow on the oauthError path.
* fix: guard findToken with deleteTokens check in blocking OAuth path
Match the returnOnOAuth path's defense-in-depth: only enable client
registration reuse when deleteTokens is also available, ensuring
cleanup is possible if the reused client turns out to be stale.
* fix: address review findings — tests, types, normalization, docs
- Add deleteTokens method to InMemoryTokenStore matching TokenMethods
contract; update test call site from deleteToken to deleteTokens
- Add MCPConnectionFactory test: returnOnOAuth flow fails with
invalid_client → clearStaleClientIfRejected invoked automatically
- Add mcp.spec.js tests: OAuth error with CSRF → failFlow called;
OAuth error without cookies → failFlow NOT called (DoS prevention)
- Add JSDoc to isClientRejection with RFC 6749 and vendor attribution
- Add inline comment explaining findToken/deleteTokens coupling guard
- Normalize issuer comparison: strip trailing slashes to prevent
spurious re-registrations from URL formatting differences
- Fix dead-code: use local reusedStoredClient variable in PENDING
join return instead of re-reading flowMeta
* fix: address final review nits N1-N4
- N1: Add session cookie failFlow test — validates the hasSession
branch triggers failFlow on OAuth error callback
- N2: Replace setTimeout(50) with setImmediate for microtask drain
- N3: Add 'unknown client' attribution to isClientRejection JSDoc
- N4: Remove dead getFlowState mock from failFlow tests
---------
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Danny Avila <danny@librechat.ai>
857 lines
28 KiB
JavaScript
857 lines
28 KiB
JavaScript
const { tool } = require('@langchain/core/tools');
|
|
const { logger, getTenantId } = require('@librechat/data-schemas');
|
|
const {
|
|
Providers,
|
|
StepTypes,
|
|
GraphEvents,
|
|
Constants: AgentConstants,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
MCPOAuthHandler,
|
|
isMCPDomainAllowed,
|
|
normalizeServerName,
|
|
normalizeJsonSchema,
|
|
GenerationJobManager,
|
|
resolveJsonSchemaRefs,
|
|
buildOAuthToolCallName,
|
|
} = require('@librechat/api');
|
|
const { Time, CacheKeys, Constants, isAssistantsEndpoint } = require('librechat-data-provider');
|
|
const {
|
|
getOAuthReconnectionManager,
|
|
getMCPServersRegistry,
|
|
getFlowStateManager,
|
|
getMCPManager,
|
|
} = require('~/config');
|
|
const { findToken, createToken, updateToken, deleteTokens } = require('~/models');
|
|
const { getGraphApiToken } = require('./GraphTokenService');
|
|
const { reinitMCPServer } = require('./Tools/mcp');
|
|
const { getAppConfig } = require('./Config');
|
|
const { getLogStores } = require('~/cache');
|
|
|
|
const MAX_CACHE_SIZE = 1000;
|
|
const lastReconnectAttempts = new Map();
|
|
const RECONNECT_THROTTLE_MS = 10_000;
|
|
|
|
const missingToolCache = new Map();
|
|
const MISSING_TOOL_TTL_MS = 10_000;
|
|
|
|
function evictStale(map, ttl) {
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
const now = Date.now();
|
|
for (const [key, timestamp] of map) {
|
|
if (now - timestamp >= ttl) {
|
|
map.delete(key);
|
|
}
|
|
if (map.size <= MAX_CACHE_SIZE) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
const unavailableMsg =
|
|
"This tool's MCP server is temporarily unavailable. Please try again shortly.";
|
|
|
|
/**
|
|
* Resolves config-source MCP servers from admin Config overrides for the current
|
|
* request context. Returns the parsed configs keyed by server name.
|
|
* @param {import('express').Request} req - Express request with user context
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveConfigServers(req) {
|
|
try {
|
|
const registry = getMCPServersRegistry();
|
|
const user = req?.user;
|
|
const appConfig = await getAppConfig({
|
|
role: user?.role,
|
|
tenantId: getTenantId(),
|
|
userId: user?.id,
|
|
});
|
|
return await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveConfigServers] Failed to resolve config servers, degrading to empty:',
|
|
error,
|
|
);
|
|
return {};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Resolves config-source servers and merges all server configs (YAML + config + user DB)
|
|
* for the given user context. Shared helper for controllers needing the full merged config.
|
|
* @param {string} userId
|
|
* @param {{ id?: string, role?: string }} [user]
|
|
* @returns {Promise<Record<string, import('@librechat/api').ParsedServerConfig>>}
|
|
*/
|
|
async function resolveAllMcpConfigs(userId, user) {
|
|
const registry = getMCPServersRegistry();
|
|
const appConfig = await getAppConfig({ role: user?.role, tenantId: getTenantId(), userId });
|
|
let configServers = {};
|
|
try {
|
|
configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
} catch (error) {
|
|
logger.warn(
|
|
'[resolveAllMcpConfigs] Config server resolution failed, continuing without:',
|
|
error,
|
|
);
|
|
}
|
|
return await registry.getAllServerConfigs(userId, configServers);
|
|
}
|
|
|
|
/**
|
|
* @param {string} toolName
|
|
* @param {string} serverName
|
|
*/
|
|
function createUnavailableToolStub(toolName, serverName) {
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
const _call = async () => [unavailableMsg, null];
|
|
const toolInstance = tool(_call, {
|
|
schema: {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
},
|
|
name: normalizedToolKey,
|
|
description: unavailableMsg,
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
return toolInstance;
|
|
}
|
|
|
|
function isEmptyObjectSchema(jsonSchema) {
|
|
return (
|
|
jsonSchema != null &&
|
|
typeof jsonSchema === 'object' &&
|
|
jsonSchema.type === 'object' &&
|
|
(jsonSchema.properties == null || Object.keys(jsonSchema.properties).length === 0) &&
|
|
!jsonSchema.additionalProperties
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
|
|
/**
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<void>}
|
|
*/
|
|
return async function (authURL) {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall, args: '' }],
|
|
auth: authURL,
|
|
expires_at: Date.now() + Time.TWO_MINUTES,
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.runId - The Run ID, i.e. message ID
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @returns {() => Promise<void>}
|
|
*/
|
|
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
|
|
return async function () {
|
|
/** @type {import('@librechat/agents').RunStep} */
|
|
const data = {
|
|
runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID,
|
|
id: stepId,
|
|
type: StepTypes.TOOL_CALLS,
|
|
index: index ?? 0,
|
|
stepDetails: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [toolCall],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Creates a function used to ensure the flow handler is only invoked once
|
|
* @param {object} params
|
|
* @param {string} params.flowId - The ID of the login flow.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
* @param {(authURL: string) => void} [params.callback]
|
|
*/
|
|
function createOAuthStart({ flowId, flowManager, callback }) {
|
|
/**
|
|
* Creates a function to handle OAuth login requests.
|
|
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
|
|
* @returns {Promise<boolean>} Returns true to indicate the event was sent successfully.
|
|
*/
|
|
return async function (authURL) {
|
|
await flowManager.createFlowWithHandler(flowId, 'oauth_login', async () => {
|
|
callback?.(authURL);
|
|
logger.debug('Sent OAuth login request to client');
|
|
return true;
|
|
});
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {string} params.stepId - The ID of the step in the flow.
|
|
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
*/
|
|
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
|
|
return async function () {
|
|
/** @type {{ id: string; delta: AgentToolCallDelta }} */
|
|
const data = {
|
|
id: stepId,
|
|
delta: {
|
|
type: StepTypes.TOOL_CALLS,
|
|
tool_calls: [{ ...toolCall }],
|
|
},
|
|
};
|
|
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
logger.debug('Sent OAuth login success to client');
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {object} params
|
|
* @param {string} params.userId - The ID of the user.
|
|
* @param {string} params.serverName - The name of the server.
|
|
* @param {string} params.toolName - The name of the tool.
|
|
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
|
|
*/
|
|
function createAbortHandler({ userId, serverName, toolName, flowManager }) {
|
|
return function () {
|
|
logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted'));
|
|
flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted'));
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {() => void} params.runStepEmitter
|
|
* @param {(authURL: string) => void} params.runStepDeltaEmitter
|
|
* @returns {(authURL: string) => void}
|
|
*/
|
|
function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
|
|
return function (authURL) {
|
|
runStepEmitter();
|
|
runStepDeltaEmitter(authURL);
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {AbortSignal} params.signal
|
|
* @param {string} params.model
|
|
* @param {number} [params.index]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
logger.debug(
|
|
`[MCP][reconnectServer] serverName: ${serverName}, user: ${user?.id}, hasUserMCPAuthMap: ${!!userMCPAuthMap}`,
|
|
);
|
|
|
|
const throttleKey = `${user.id}:${serverName}`;
|
|
const now = Date.now();
|
|
const lastAttempt = lastReconnectAttempts.get(throttleKey) ?? 0;
|
|
if (now - lastAttempt < RECONNECT_THROTTLE_MS) {
|
|
logger.debug(`[MCP][reconnectServer] Throttled reconnect for ${serverName}`);
|
|
return null;
|
|
}
|
|
lastReconnectAttempts.set(throttleKey, now);
|
|
evictStale(lastReconnectAttempts, RECONNECT_THROTTLE_MS);
|
|
|
|
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
|
|
const flowId = `${user.id}:${serverName}:${Date.now()}`;
|
|
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
|
const stepId = 'step_oauth_login_' + serverName;
|
|
const toolCall = {
|
|
id: flowId,
|
|
name: buildOAuthToolCallName(serverName),
|
|
type: 'tool_call_chunk',
|
|
};
|
|
|
|
// Set up abort handler to clean up OAuth flows if request is aborted
|
|
const oauthFlowId = MCPOAuthHandler.generateFlowId(user.id, serverName);
|
|
const abortHandler = () => {
|
|
logger.info(
|
|
`[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`,
|
|
);
|
|
// Clean up both mcp_oauth and mcp_get_tokens flows
|
|
flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted'));
|
|
flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted'));
|
|
};
|
|
|
|
if (signal) {
|
|
signal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
try {
|
|
const runStepEmitter = createRunStepEmitter({
|
|
res,
|
|
index,
|
|
runId,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
|
|
const oauthStart = createOAuthStart({
|
|
res,
|
|
flowId,
|
|
callback,
|
|
flowManager,
|
|
});
|
|
return await reinitMCPServer({
|
|
user,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
oauthStart,
|
|
flowManager,
|
|
userMCPAuthMap,
|
|
forceNew: true,
|
|
returnOnOAuth: false,
|
|
connectionTimeout: Time.THIRTY_SECONDS,
|
|
});
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (signal) {
|
|
signal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Creates all tools from the specified MCP Server via `toolKey`.
|
|
*
|
|
* This function assumes tools could not be aggregated from the cache of tool definitions,
|
|
* i.e. `availableTools`, and will reinitialize the MCP server to ensure all tools are generated.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.serverName
|
|
* @param {string} params.model
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTools({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
config,
|
|
provider,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId = null,
|
|
}) {
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({ role: user?.role, tenantId: user?.tenantId });
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const isDomainAllowed = await isMCPDomainAllowed(serverConfig, allowedDomains);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain not allowed, skipping all tools`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
if (result === null) {
|
|
logger.debug(`[MCP][${serverName}] Reconnect throttled, skipping tool creation.`);
|
|
return [];
|
|
}
|
|
if (!result || !result.tools) {
|
|
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
|
|
return [];
|
|
}
|
|
|
|
const serverTools = [];
|
|
for (const tool of result.tools) {
|
|
const toolInstance = await createMCPTool({
|
|
res,
|
|
user,
|
|
provider,
|
|
userMCPAuthMap,
|
|
configServers,
|
|
streamId,
|
|
availableTools: result.availableTools,
|
|
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
|
|
config: serverConfig,
|
|
});
|
|
if (toolInstance) {
|
|
serverTools.push(toolInstance);
|
|
}
|
|
}
|
|
|
|
return serverTools;
|
|
}
|
|
|
|
/**
|
|
* Creates a single tool from the specified MCP Server via `toolKey`.
|
|
* @param {Object} params
|
|
* @param {ServerResponse} params.res - The Express response object for sending events.
|
|
* @param {IUser} params.user - The user from the request object.
|
|
* @param {string} params.toolKey - The toolKey for the tool.
|
|
* @param {string} params.model - The model for the tool.
|
|
* @param {number} [params.index]
|
|
* @param {AbortSignal} [params.signal]
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
|
|
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
|
|
* @param {LCAvailableTools} [params.availableTools]
|
|
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
|
|
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
|
|
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
|
|
*/
|
|
async function createMCPTool({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
toolKey,
|
|
provider,
|
|
userMCPAuthMap,
|
|
availableTools,
|
|
config,
|
|
configServers,
|
|
streamId = null,
|
|
}) {
|
|
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
|
|
|
|
const serverConfig =
|
|
config ?? (await getMCPServersRegistry().getServerConfig(serverName, user?.id, configServers));
|
|
if (serverConfig?.url) {
|
|
const appConfig = await getAppConfig({ role: user?.role, tenantId: user?.tenantId });
|
|
const allowedDomains = appConfig?.mcpSettings?.allowedDomains;
|
|
const isDomainAllowed = await isMCPDomainAllowed(serverConfig, allowedDomains);
|
|
if (!isDomainAllowed) {
|
|
logger.warn(`[MCP][${serverName}] Domain no longer allowed, skipping tool: ${toolName}`);
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
/** @type {LCTool | undefined} */
|
|
let toolDefinition = availableTools?.[toolKey]?.function;
|
|
if (!toolDefinition) {
|
|
const cachedAt = missingToolCache.get(toolKey);
|
|
if (cachedAt && Date.now() - cachedAt < MISSING_TOOL_TTL_MS) {
|
|
logger.debug(
|
|
`[MCP][${serverName}][${toolName}] Tool in negative cache, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Requested tool not found in available tools, re-initializing MCP server.`,
|
|
);
|
|
const result = await reconnectServer({
|
|
res,
|
|
user,
|
|
index,
|
|
signal,
|
|
serverName,
|
|
configServers,
|
|
userMCPAuthMap,
|
|
streamId,
|
|
});
|
|
toolDefinition = result?.availableTools?.[toolKey]?.function;
|
|
|
|
if (!toolDefinition) {
|
|
missingToolCache.set(toolKey, Date.now());
|
|
evictStale(missingToolCache, MISSING_TOOL_TTL_MS);
|
|
}
|
|
}
|
|
|
|
if (!toolDefinition) {
|
|
logger.warn(
|
|
`[MCP][${serverName}][${toolName}] Tool definition not found, returning unavailable stub.`,
|
|
);
|
|
return createUnavailableToolStub(toolName, serverName);
|
|
}
|
|
|
|
return createToolInstance({
|
|
res,
|
|
provider,
|
|
toolName,
|
|
serverName,
|
|
serverConfig,
|
|
toolDefinition,
|
|
streamId,
|
|
});
|
|
}
|
|
|
|
function createToolInstance({
|
|
res,
|
|
toolName,
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolDefinition,
|
|
provider: capturedProvider,
|
|
streamId = null,
|
|
}) {
|
|
/** @type {LCTool} */
|
|
const { description, parameters } = toolDefinition;
|
|
const isGoogle = capturedProvider === Providers.VERTEXAI || capturedProvider === Providers.GOOGLE;
|
|
|
|
let schema = parameters ? normalizeJsonSchema(resolveJsonSchemaRefs(parameters)) : null;
|
|
|
|
if (!schema || (isGoogle && isEmptyObjectSchema(schema))) {
|
|
schema = {
|
|
type: 'object',
|
|
properties: {
|
|
input: { type: 'string', description: 'Input for the tool' },
|
|
},
|
|
required: [],
|
|
};
|
|
}
|
|
|
|
const normalizedToolKey = `${toolName}${Constants.mcp_delimiter}${normalizeServerName(serverName)}`;
|
|
|
|
/** @type {(toolArguments: Object | string, config?: GraphRunnableConfig) => Promise<unknown>} */
|
|
const _call = async (toolArguments, config) => {
|
|
const userId = config?.configurable?.user?.id || config?.configurable?.user_id;
|
|
/** @type {ReturnType<typeof createAbortHandler>} */
|
|
let abortHandler = null;
|
|
/** @type {AbortSignal} */
|
|
let derivedSignal = null;
|
|
|
|
try {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
derivedSignal = config?.signal ? AbortSignal.any([config.signal]) : undefined;
|
|
const mcpManager = getMCPManager(userId);
|
|
const provider = (config?.metadata?.provider || capturedProvider)?.toLowerCase();
|
|
|
|
const { args: _args, stepId, ...toolCall } = config.toolCall ?? {};
|
|
const flowId = `${serverName}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`;
|
|
const runStepDeltaEmitter = createRunStepDeltaEmitter({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
const oauthStart = createOAuthStart({
|
|
flowId,
|
|
flowManager,
|
|
callback: runStepDeltaEmitter,
|
|
});
|
|
const oauthEnd = createOAuthEnd({
|
|
res,
|
|
stepId,
|
|
toolCall,
|
|
streamId,
|
|
});
|
|
|
|
if (derivedSignal) {
|
|
abortHandler = createAbortHandler({ userId, serverName, toolName, flowManager });
|
|
derivedSignal.addEventListener('abort', abortHandler, { once: true });
|
|
}
|
|
|
|
const customUserVars =
|
|
config?.configurable?.userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
|
|
|
const result = await mcpManager.callTool({
|
|
serverName,
|
|
serverConfig: capturedServerConfig,
|
|
toolName,
|
|
provider,
|
|
toolArguments,
|
|
options: {
|
|
signal: derivedSignal,
|
|
},
|
|
user: config?.configurable?.user,
|
|
requestBody: config?.configurable?.requestBody,
|
|
customUserVars,
|
|
flowManager,
|
|
tokenMethods: {
|
|
findToken,
|
|
createToken,
|
|
updateToken,
|
|
deleteTokens,
|
|
},
|
|
oauthStart,
|
|
oauthEnd,
|
|
graphTokenResolver: getGraphApiToken,
|
|
});
|
|
|
|
if (isAssistantsEndpoint(provider) && Array.isArray(result)) {
|
|
return result[0];
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
logger.error(
|
|
`[MCP][${serverName}][${toolName}][User: ${userId}] Error calling MCP tool:`,
|
|
error,
|
|
);
|
|
|
|
/** OAuth error, provide a helpful message */
|
|
const isOAuthError =
|
|
error.message?.includes('401') ||
|
|
error.message?.includes('OAuth') ||
|
|
error.message?.includes('authentication') ||
|
|
error.message?.includes('Non-200 status code (401)');
|
|
|
|
if (isOAuthError) {
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] OAuth authentication required. Please check the server logs for the authentication URL.`,
|
|
);
|
|
}
|
|
|
|
throw new Error(
|
|
`[MCP][${serverName}][${toolName}] tool call failed${error?.message ? `: ${error?.message}` : '.'}`,
|
|
);
|
|
} finally {
|
|
// Clean up abort handler to prevent memory leaks
|
|
if (abortHandler && derivedSignal) {
|
|
derivedSignal.removeEventListener('abort', abortHandler);
|
|
}
|
|
}
|
|
};
|
|
|
|
const toolInstance = tool(_call, {
|
|
schema,
|
|
name: normalizedToolKey,
|
|
description: description || '',
|
|
responseFormat: AgentConstants.CONTENT_AND_ARTIFACT,
|
|
});
|
|
toolInstance.mcp = true;
|
|
toolInstance.mcpRawServerName = serverName;
|
|
toolInstance.mcpJsonSchema = parameters;
|
|
return toolInstance;
|
|
}
|
|
|
|
/**
|
|
* Get MCP setup data including config, connections, and OAuth servers.
|
|
* Resolves config-source servers from admin Config overrides when tenant context is available.
|
|
* @param {string} userId - The user ID
|
|
* @param {{ role?: string, tenantId?: string }} [options] - Optional role/tenant context
|
|
* @returns {Object} Object containing mcpConfig, appConnections, userConnections, and oauthServers
|
|
*/
|
|
async function getMCPSetupData(userId, options = {}) {
|
|
const registry = getMCPServersRegistry();
|
|
const { role, tenantId } = options;
|
|
|
|
const appConfig = await getAppConfig({ role, tenantId, userId });
|
|
const configServers = await registry.ensureConfigServers(appConfig?.mcpConfig || {});
|
|
const mcpConfig = await registry.getAllServerConfigs(userId, configServers);
|
|
const mcpManager = getMCPManager(userId);
|
|
/** @type {Map<string, import('@librechat/api').MCPConnection>} */
|
|
let appConnections = new Map();
|
|
try {
|
|
// Use getLoaded() instead of getAll() to avoid forcing connection creation.
|
|
// getAll() creates connections for all servers, which is problematic for servers
|
|
// that require user context (e.g., those with {{LIBRECHAT_USER_ID}} placeholders).
|
|
appConnections = (await mcpManager.appConnections?.getLoaded()) || new Map();
|
|
} catch (error) {
|
|
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
|
|
}
|
|
const userConnections = mcpManager.getUserConnections(userId) || new Map();
|
|
const oauthServers = new Set(
|
|
Object.entries(mcpConfig)
|
|
.filter(([, config]) => config.requiresOAuth)
|
|
.map(([name]) => name),
|
|
);
|
|
|
|
return {
|
|
mcpConfig,
|
|
oauthServers,
|
|
appConnections,
|
|
userConnections,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check OAuth flow status for a user and server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @returns {Object} Object containing hasActiveFlow and hasFailedFlow flags
|
|
*/
|
|
async function checkOAuthFlowStatus(userId, serverName) {
|
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
|
const flowManager = getFlowStateManager(flowsCache);
|
|
const flowId = MCPOAuthHandler.generateFlowId(userId, serverName);
|
|
|
|
try {
|
|
const flowState = await flowManager.getFlowState(flowId, 'mcp_oauth');
|
|
if (!flowState) {
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
|
|
const flowAge = Date.now() - flowState.createdAt;
|
|
const flowTTL = flowState.ttl || 180000; // Default 3 minutes
|
|
|
|
if (flowState.status === 'FAILED' || flowAge > flowTTL) {
|
|
const wasCancelled = flowState.error && flowState.error.includes('cancelled');
|
|
|
|
if (wasCancelled) {
|
|
logger.debug(`[MCP Connection Status] Found cancelled OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} else {
|
|
logger.debug(`[MCP Connection Status] Found failed OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
status: flowState.status,
|
|
flowAge,
|
|
flowTTL,
|
|
timedOut: flowAge > flowTTL,
|
|
error: flowState.error,
|
|
});
|
|
return { hasActiveFlow: false, hasFailedFlow: true };
|
|
}
|
|
}
|
|
|
|
if (flowState.status === 'PENDING') {
|
|
logger.debug(`[MCP Connection Status] Found active OAuth flow for ${serverName}`, {
|
|
flowId,
|
|
flowAge,
|
|
flowTTL,
|
|
});
|
|
return { hasActiveFlow: true, hasFailedFlow: false };
|
|
}
|
|
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
} catch (error) {
|
|
logger.error(`[MCP Connection Status] Error checking OAuth flows for ${serverName}:`, error);
|
|
return { hasActiveFlow: false, hasFailedFlow: false };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get connection status for a specific MCP server
|
|
* @param {string} userId - The user ID
|
|
* @param {string} serverName - The server name
|
|
* @param {import('@librechat/api').ParsedServerConfig} config - The server configuration
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} appConnections - App-level connections
|
|
* @param {Map<string, import('@librechat/api').MCPConnection>} userConnections - User-level connections
|
|
* @param {Set} oauthServers - Set of OAuth servers
|
|
* @returns {Object} Object containing requiresOAuth and connectionState
|
|
*/
|
|
async function getServerConnectionStatus(
|
|
userId,
|
|
serverName,
|
|
config,
|
|
appConnections,
|
|
userConnections,
|
|
oauthServers,
|
|
) {
|
|
const connection = appConnections.get(serverName) || userConnections.get(serverName);
|
|
const isStaleOrDoNotExist = connection ? connection?.isStale(config.updatedAt) : true;
|
|
|
|
const baseConnectionState = isStaleOrDoNotExist
|
|
? 'disconnected'
|
|
: connection?.connectionState || 'disconnected';
|
|
let finalConnectionState = baseConnectionState;
|
|
|
|
// connection state overrides specific to OAuth servers
|
|
if (baseConnectionState === 'disconnected' && oauthServers.has(serverName)) {
|
|
// check if server is actively being reconnected
|
|
const oauthReconnectionManager = getOAuthReconnectionManager();
|
|
if (oauthReconnectionManager.isReconnecting(userId, serverName)) {
|
|
finalConnectionState = 'connecting';
|
|
} else {
|
|
const { hasActiveFlow, hasFailedFlow } = await checkOAuthFlowStatus(userId, serverName);
|
|
|
|
if (hasFailedFlow) {
|
|
finalConnectionState = 'error';
|
|
} else if (hasActiveFlow) {
|
|
finalConnectionState = 'connecting';
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
requiresOAuth: oauthServers.has(serverName),
|
|
connectionState: finalConnectionState,
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
createMCPTool,
|
|
createMCPTools,
|
|
getMCPSetupData,
|
|
resolveConfigServers,
|
|
resolveAllMcpConfigs,
|
|
checkOAuthFlowStatus,
|
|
getServerConnectionStatus,
|
|
createUnavailableToolStub,
|
|
};
|