🪂 refactor: MCP Server Init Fallback (#10608)

* 🌿 refactor: MCP Server Init and Registry with Fallback Configs

* chore: Redis Cache Flushing for Cluster Support
This commit is contained in:
Danny Avila 2025-11-20 16:47:00 -05:00 committed by GitHub
parent 1e4c255351
commit b49545d916
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 99 additions and 38 deletions

View file

@ -30,11 +30,46 @@ const publicSharedLinksEnabled =
const sharePointFilePickerEnabled = isEnabled(process.env.ENABLE_SHAREPOINT_FILEPICKER); const sharePointFilePickerEnabled = isEnabled(process.env.ENABLE_SHAREPOINT_FILEPICKER);
const openidReuseTokens = isEnabled(process.env.OPENID_REUSE_TOKENS); const openidReuseTokens = isEnabled(process.env.OPENID_REUSE_TOKENS);
/**
* Fetches MCP servers from registry and adds them to the payload.
* Registry now includes all configured servers (from YAML) plus inspection data when available.
* Always fetches fresh to avoid caching incomplete initialization state.
*/
const getMCPServers = async (payload, appConfig) => {
try {
if (appConfig?.mcpConfig == null) {
return;
}
const mcpManager = getMCPManager();
if (!mcpManager) {
return;
}
const mcpServers = await mcpServersRegistry.getAllServerConfigs();
if (!mcpServers) return;
for (const serverName in mcpServers) {
if (!payload.mcpServers) {
payload.mcpServers = {};
}
const serverConfig = mcpServers[serverName];
payload.mcpServers[serverName] = removeNullishValues({
startup: serverConfig?.startup,
chatMenu: serverConfig?.chatMenu,
isOAuth: serverConfig.requiresOAuth,
customUserVars: serverConfig?.customUserVars,
});
}
} catch (error) {
logger.error('Error loading MCP servers', error);
}
};
router.get('/', async function (req, res) { router.get('/', async function (req, res) {
const cache = getLogStores(CacheKeys.CONFIG_STORE); const cache = getLogStores(CacheKeys.CONFIG_STORE);
const cachedStartupConfig = await cache.get(CacheKeys.STARTUP_CONFIG); const cachedStartupConfig = await cache.get(CacheKeys.STARTUP_CONFIG);
if (cachedStartupConfig) { if (cachedStartupConfig) {
const appConfig = await getAppConfig({ role: req.user?.role });
await getMCPServers(cachedStartupConfig, appConfig);
res.send(cachedStartupConfig); res.send(cachedStartupConfig);
return; return;
} }
@ -126,35 +161,6 @@ router.get('/', async function (req, res) {
payload.minPasswordLength = minPasswordLength; payload.minPasswordLength = minPasswordLength;
} }
const getMCPServers = async () => {
try {
if (appConfig?.mcpConfig == null) {
return;
}
const mcpManager = getMCPManager();
if (!mcpManager) {
return;
}
const mcpServers = await mcpServersRegistry.getAllServerConfigs();
if (!mcpServers) return;
for (const serverName in mcpServers) {
if (!payload.mcpServers) {
payload.mcpServers = {};
}
const serverConfig = mcpServers[serverName];
payload.mcpServers[serverName] = removeNullishValues({
startup: serverConfig?.startup,
chatMenu: serverConfig?.chatMenu,
isOAuth: serverConfig.requiresOAuth,
customUserVars: serverConfig?.customUserVars,
});
}
} catch (error) {
logger.error('Error loading MCP servers', error);
}
};
await getMCPServers();
const webSearchConfig = appConfig?.webSearch; const webSearchConfig = appConfig?.webSearch;
if ( if (
webSearchConfig != null && webSearchConfig != null &&
@ -184,6 +190,7 @@ router.get('/', async function (req, res) {
} }
await cache.set(CacheKeys.STARTUP_CONFIG, payload); await cache.set(CacheKeys.STARTUP_CONFIG, payload);
await getMCPServers(payload, appConfig);
return res.status(200).send(payload); return res.status(200).send(payload);
} catch (err) { } catch (err) {
logger.error('Error in startup config', err); logger.error('Error in startup config', err);

View file

@ -158,12 +158,22 @@ async function flushRedisCache(dryRun = false, verbose = false) {
if (dryRun) { if (dryRun) {
console.log('🔍 [DRY RUN] Would flush Redis cache'); console.log('🔍 [DRY RUN] Would flush Redis cache');
try { try {
const keys = await redis.keys('*'); let allKeys = [];
console.log(` Would delete ${keys.length} keys`); if (useCluster) {
if (verbose && keys.length > 0) { const nodes = redis.nodes('master');
console.log(` Cluster detected: ${nodes.length} master nodes`);
for (const node of nodes) {
const keys = await node.keys('*');
allKeys = allKeys.concat(keys);
}
} else {
allKeys = await redis.keys('*');
}
console.log(` Would delete ${allKeys.length} keys`);
if (verbose && allKeys.length > 0) {
console.log( console.log(
' Sample keys:', ' Sample keys:',
keys.slice(0, 10).join(', ') + (keys.length > 10 ? '...' : ''), allKeys.slice(0, 10).join(', ') + (allKeys.length > 10 ? '...' : ''),
); );
} }
} catch (error) { } catch (error) {
@ -176,15 +186,29 @@ async function flushRedisCache(dryRun = false, verbose = false) {
// Get key count before flushing // Get key count before flushing
let keyCount = 0; let keyCount = 0;
try { try {
if (useCluster) {
const nodes = redis.nodes('master');
for (const node of nodes) {
const keys = await node.keys('*');
keyCount += keys.length;
}
} else {
const keys = await redis.keys('*'); const keys = await redis.keys('*');
keyCount = keys.length; keyCount = keys.length;
}
} catch (_error) { } catch (_error) {
// Continue with flush even if we can't count keys // Continue with flush even if we can't count keys
} }
// Flush the Redis cache // Flush the Redis cache
if (useCluster) {
const nodes = redis.nodes('master');
await Promise.all(nodes.map((node) => node.flushdb()));
console.log(`✅ Redis cluster cache flushed successfully (${nodes.length} master nodes)`);
} else {
await redis.flushdb(); await redis.flushdb();
console.log('✅ Redis cache flushed successfully'); console.log('✅ Redis cache flushed successfully');
}
if (keyCount > 0) { if (keyCount > 0) {
console.log(` Deleted ${keyCount} keys`); console.log(` Deleted ${keyCount} keys`);

View file

@ -34,6 +34,9 @@ export class MCPServersInitializer {
public static async initialize(rawConfigs: t.MCPServers): Promise<void> { public static async initialize(rawConfigs: t.MCPServers): Promise<void> {
if (await statusCache.isInitialized()) return; if (await statusCache.isInitialized()) return;
/** Store raw configs immediately so they're available even if initialization fails/is slow */
registry.setRawConfigs(rawConfigs);
if (await isLeader()) { if (await isLeader()) {
// Leader performs initialization // Leader performs initialization
await statusCache.reset(); await statusCache.reset();

View file

@ -13,12 +13,22 @@ import {
* *
* Provides a unified interface for retrieving server configs with proper fallback hierarchy: * Provides a unified interface for retrieving server configs with proper fallback hierarchy:
* checks shared app servers first, then shared user servers, then private user servers. * checks shared app servers first, then shared user servers, then private user servers.
* Falls back to raw config when servers haven't been initialized yet or failed to initialize.
* Handles server lifecycle operations including adding, removing, and querying configurations. * Handles server lifecycle operations including adding, removing, and querying configurations.
*/ */
class MCPServersRegistry { class MCPServersRegistry {
public readonly sharedAppServers = ServerConfigsCacheFactory.create('App', false); public readonly sharedAppServers = ServerConfigsCacheFactory.create('App', false);
public readonly sharedUserServers = ServerConfigsCacheFactory.create('User', false); public readonly sharedUserServers = ServerConfigsCacheFactory.create('User', false);
private readonly privateUserServers: Map<string | undefined, ServerConfigsCache> = new Map(); private readonly privateUserServers: Map<string | undefined, ServerConfigsCache> = new Map();
private rawConfigs: t.MCPServers = {};
/**
* Stores the raw MCP configuration as a fallback when servers haven't been initialized yet.
* Should be called during initialization before inspecting servers.
*/
public setRawConfigs(configs: t.MCPServers): void {
this.rawConfigs = configs;
}
public async addPrivateUserServer( public async addPrivateUserServer(
userId: string, userId: string,
@ -59,15 +69,32 @@ class MCPServersRegistry {
const privateUserServer = await this.privateUserServers.get(userId)?.get(serverName); const privateUserServer = await this.privateUserServers.get(userId)?.get(serverName);
if (privateUserServer) return privateUserServer; if (privateUserServer) return privateUserServer;
/** Fallback to raw config if server hasn't been initialized yet */
const rawConfig = this.rawConfigs[serverName];
if (rawConfig) return rawConfig as t.ParsedServerConfig;
return undefined; return undefined;
} }
public async getAllServerConfigs(userId?: string): Promise<Record<string, t.ParsedServerConfig>> { public async getAllServerConfigs(userId?: string): Promise<Record<string, t.ParsedServerConfig>> {
return { const registryConfigs = {
...(await this.sharedAppServers.getAll()), ...(await this.sharedAppServers.getAll()),
...(await this.sharedUserServers.getAll()), ...(await this.sharedUserServers.getAll()),
...((await this.privateUserServers.get(userId)?.getAll()) ?? {}), ...((await this.privateUserServers.get(userId)?.getAll()) ?? {}),
}; };
/** Include all raw configs, but registry configs take precedence (they have inspection data) */
const allConfigs: Record<string, t.ParsedServerConfig> = {};
for (const serverName in this.rawConfigs) {
allConfigs[serverName] = this.rawConfigs[serverName] as t.ParsedServerConfig;
}
/** Override with registry configs where available (they have richer data) */
for (const serverName in registryConfigs) {
allConfigs[serverName] = registryConfigs[serverName];
}
return allConfigs;
} }
// TODO: This is currently used to determine if a server requires OAuth. However, this info can // TODO: This is currently used to determine if a server requires OAuth. However, this info can