📡 refactor: MCP Runtime Config Sync with Redis Distributed Locking (#10352)

* 🔄 Refactoring: MCP Runtime Configuration Reload
 - PrivateServerConfigs own cache classes (inMemory and Redis).
 - Connections staleness detection by comparing (connection.createdAt and config.LastUpdatedAt)
 - ConnectionsRepo access Registry instead of in memory config dict and renew stale connections
 - MCPManager: adjusted init of ConnectionsRepo (app level)
 - UserConnectionManager: renew stale connections
 - skipped test, to test "should only clear keys in its own namespace"
 - MCPPrivateServerLoader: new component to manage logic of loading / editing private servers on runtime
 - PrivateServersLoadStatusCache to track private server cache status
 - New unit and integration tests.
Misc:
 - add es lint rule to enforce line between class methods

* Fix cluster mode batch update and delete workarround. Fixed unit tests for cluster mode.

* Fix Keyv redis clear cache namespace  awareness issue + Integration tests fixes

* chore: address copilot comments

* Fixing rebase issue: removed the mcp config fallback in single getServerConfig method:
- to not to interfere with the logic of the right Tier (APP/USER/Private)
- If userId is null, the getServerConfig should not return configs that are a SharedUser tier and not APP tier

* chore: add dev-staging branch to workflow triggers for backend, cache integration, and ESLint checks

---------

Co-authored-by: Atef Bellaaj <slalom.bellaaj@external.daimlertruck.com>
This commit is contained in:
Atef Bellaaj 2025-11-26 15:11:36 +01:00 committed by Danny Avila
parent 242832287a
commit 9786a7654e
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
49 changed files with 5244 additions and 257 deletions

View file

@ -0,0 +1,115 @@
import type * as t from '~/mcp/types';
import { ServerConfigsCache, ServerConfigsCacheFactory } from '../ServerConfigsCacheFactory';
import { logger } from '@librechat/data-schemas';
export abstract class PrivateServerConfigsCacheBase {
protected readonly PREFIX = 'MCP::ServersRegistry::Servers::Private';
protected caches: Map<string, ServerConfigsCache> = new Map();
public async add(
userId: string,
serverName: string,
config: t.ParsedServerConfig,
): Promise<void> {
const userCache = this.getOrCreatePrivateUserCache(userId);
await userCache.add(serverName, config);
}
public async update(
userId: string,
serverName: string,
config: t.ParsedServerConfig,
): Promise<void> {
const userCache = this.getOrCreatePrivateUserCache(userId);
await userCache.update(serverName, config);
}
/**
* Get a specific server config from a user's cache.
*/
public async get(userId: string, serverName: string): Promise<t.ParsedServerConfig | undefined> {
const cache = this.getOrCreatePrivateUserCache(userId);
return await cache.get(serverName);
}
/**
* Get all server configs for a user.
*/
public async getAll(userId: string): Promise<Record<string, t.ParsedServerConfig>> {
const cache = this.getOrCreatePrivateUserCache(userId);
return await cache.getAll();
}
/**
* Check if a user has a cache instance loaded.
*/
public abstract has(userId: string): Promise<boolean>;
public async remove(userId: string, serverName: string): Promise<void> {
const userCache = this.getOrCreatePrivateUserCache(userId);
await userCache.remove(serverName);
}
public async reset(userId: string): Promise<void> {
const cache = this.getOrCreatePrivateUserCache(userId);
return cache.reset();
}
// ============= BATCH OPERATION PRIMITIVES =============
// Simple primitives for MCPPrivateServerLoader orchestration - no business logic
/**
* Update server config in ALL user caches that already have it.
* Efficient: Uses pattern-based scan, skips users who don't have it.
* Use case: Metadata changed (command, args, env)
*/
public abstract updateServerConfigIfExists(
serverName: string,
config: t.ParsedServerConfig,
): Promise<void>;
/**
* Add server config ONLY to users whose caches are already initialized.
* Skips users without initialized caches (doesn't create new caches).
* Use case: Granting access to existing users
*/
public abstract addServerConfigIfCacheExists(
userIds: string[],
serverName: string,
config: t.ParsedServerConfig,
): Promise<void>;
/**
* Remove server config ONLY from users whose caches exist.
* Ignores users without initialized caches.
* Use case: Revoking access from users
*/
public abstract removeServerConfigIfCacheExists(
userIds: string[],
serverName: string,
): Promise<void>;
/**
* Find all users who have this server in their cache.
* Primitive for determining affected users.
*/
public abstract findUsersWithServer(serverName: string): Promise<string[]>;
/**
* Clear all private server configs for all users (nuclear option).
* Use sparingly - typically only for testing or full reset.
*/
public abstract resetAll(): Promise<void>;
protected getOrCreatePrivateUserCache(userId: string): ServerConfigsCache {
if (!userId) {
logger.error('userId is required to get or create private user cache');
throw new Error('userId is required to get or create private user cache');
}
if (!this.caches.has(userId)) {
const cache = ServerConfigsCacheFactory.create(userId, 'Private', false);
this.caches.set(userId, cache);
}
return this.caches.get(userId)!;
}
}

View file

@ -0,0 +1,32 @@
import { cacheConfig } from '~/cache';
import { PrivateServerConfigsCacheInMemory } from './PrivateServerConfigsCacheInMemory';
import { PrivateServerConfigsCacheRedis } from './PrivateServerConfigsCacheRedis';
export type PrivateServerConfigsCache =
| PrivateServerConfigsCacheInMemory
| PrivateServerConfigsCacheRedis;
/**
* Factory for creating the appropriate PrivateServerConfigsCache implementation based on deployment mode.
* Automatically selects between in-memory and Redis-backed storage depending on USE_REDIS config.
* In single-instance mode (USE_REDIS=false), returns lightweight in-memory cache.
* In cluster mode (USE_REDIS=true), returns Redis-backed cache with distributed coordination.
* Provides a unified interface regardless of the underlying storage mechanism.
*/
export class PrivateServerConfigsCacheFactory {
/**
* Create a ServerConfigsCache instance.
* Returns Redis implementation if Redis is configured, otherwise in-memory implementation.
*
* @returns PrivateServerConfigsCache instance
*/
static create(): PrivateServerConfigsCache {
if (cacheConfig.USE_REDIS) {
return new PrivateServerConfigsCacheRedis();
}
// In-memory mode uses a simple Map - doesn't need owner/namespace
return new PrivateServerConfigsCacheInMemory();
}
}

View file

@ -0,0 +1,105 @@
import { ParsedServerConfig } from '~/mcp/types';
import { PrivateServerConfigsCacheBase } from './PrivateServerConfigsCacheBase';
import { logger } from '@librechat/data-schemas';
import { ServerConfigsCacheInMemory } from '../ServerConfigsCacheInMemory';
export class PrivateServerConfigsCacheInMemory extends PrivateServerConfigsCacheBase {
public async has(userId: string): Promise<boolean> {
return this.caches.has(userId);
}
public async updateServerConfigIfExists(
serverName: string,
config: ParsedServerConfig,
): Promise<void> {
let updatedCount = 0;
for (const [userId, userCache] of this.caches.entries()) {
const existing = await userCache.get(serverName);
if (existing) {
const inMemoryCache = userCache as ServerConfigsCacheInMemory;
await inMemoryCache.set(serverName, config);
updatedCount++;
logger.debug(`[MCP][PrivateServers][InMemory] Updated "${serverName}" for user ${userId}`);
}
}
logger.info(
`[MCP][PrivateServers][InMemory] Propagated config update for "${serverName}" to ${updatedCount} users`,
);
}
public async addServerConfigIfCacheExists(
userIds: string[],
serverName: string,
config: ParsedServerConfig,
): Promise<void> {
let addedCount = 0;
for (const userId of userIds) {
if (this.caches.has(userId)) {
// Only if cache initialized
const userCache = this.getOrCreatePrivateUserCache(userId);
const inMemoryCache = userCache as ServerConfigsCacheInMemory;
await inMemoryCache.set(serverName, config);
addedCount++;
logger.debug(`[MCP][PrivateServers][InMemory] Added "${serverName}" to user ${userId}`);
}
}
logger.info(
`[MCP][PrivateServers][InMemory] Granted access to "${serverName}" for ${addedCount}/${userIds.length} initialized users`,
);
}
public async removeServerConfigIfCacheExists(
userIds: string[],
serverName: string,
): Promise<void> {
let removedCount = 0;
for (const userId of userIds) {
if (this.caches.has(userId)) {
try {
const userCache = this.getOrCreatePrivateUserCache(userId);
await userCache.remove(serverName);
removedCount++;
logger.debug(
`[MCP][PrivateServers][InMemory] Removed "${serverName}" from user ${userId}`,
);
} catch (error) {
// Ignore - server might not exist for this user
logger.debug(
`[MCP][PrivateServers][InMemory] Server "${serverName}" not found for user ${userId}`,
error,
);
}
}
}
logger.info(
`[MCP][PrivateServers][InMemory] Revoked access to "${serverName}" from ${removedCount}/${userIds.length} users`,
);
}
public async findUsersWithServer(serverName: string): Promise<string[]> {
const userIds: string[] = [];
for (const [userId, userCache] of this.caches.entries()) {
const config = await userCache.get(serverName);
if (config) {
userIds.push(userId);
}
}
return userIds;
}
/**
* Clear ALL servers from ALL user caches (nuclear option).
*/
public async resetAll(): Promise<void> {
this.caches.clear();
logger.info(`[MCP][PrivateServers][InMemory] Cleared ALL user caches`);
}
}

View file

@ -0,0 +1,284 @@
import { ParsedServerConfig } from '~/mcp/types';
import { keyvRedisClient } from '~/cache';
import { PrivateServerConfigsCacheBase } from './PrivateServerConfigsCacheBase';
import { logger } from '@librechat/data-schemas';
import { cacheConfig } from '~/cache/cacheConfig';
import { batchDeleteKeys, scanKeys } from '~/cache/redisUtils';
export class PrivateServerConfigsCacheRedis extends PrivateServerConfigsCacheBase {
/**
* Detect if Redis is running in cluster mode.
* In cluster mode, we need to avoid CROSSSLOT errors by using pipelines instead of multi() transactions.
*/
private isClusterMode(): boolean {
return cacheConfig.USE_REDIS_CLUSTER;
}
public async has(userId: string): Promise<boolean> {
if (!userId || !keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
return false;
}
const pattern = `*${this.PREFIX}::${userId}:*`;
for await (const _key of keyvRedisClient.scanIterator({
MATCH: pattern,
COUNT: 1,
})) {
return true;
}
return false; // No keys found - cache not initialized
}
public async updateServerConfigIfExists(
serverName: string,
config: ParsedServerConfig,
): Promise<void> {
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
logger.warn('[MCP][PrivateServers][Redis] Redis SCAN not available');
return;
}
const pattern = this.generateScanKeyPattern(serverName);
try {
// Efficient: Pattern-based scan for specific serverName
// All cache keys that have the serverName
const keysToUpdate = await scanKeys(keyvRedisClient, pattern);
if (keysToUpdate.length > 0) {
const updatedConfig = { ...config, lastUpdatedAt: Date.now() };
const keyvFormat = { value: updatedConfig, expires: null };
const serializedConfig = JSON.stringify(keyvFormat);
const chunkSize = cacheConfig.REDIS_UPDATE_CHUNK_SIZE;
if (this.isClusterMode()) {
// Cluster mode: Use individual commands in parallel (no atomicity, but works across slots)
for (let i = 0; i < keysToUpdate.length; i += chunkSize) {
const chunk = keysToUpdate.slice(i, i + chunkSize);
await Promise.all(
chunk.map((key) => keyvRedisClient!.set(key, serializedConfig, { XX: true })),
);
}
} else {
// Single-node mode: Use multi() for atomic transactions
for (let i = 0; i < keysToUpdate.length; i += chunkSize) {
const chunk = keysToUpdate.slice(i, i + chunkSize);
const multi = keyvRedisClient.multi();
for (const key of chunk) {
multi.set(key, serializedConfig, { XX: true });
}
await multi.exec();
}
}
logger.info(
`[MCP][PrivateServers][Redis] Propagated config update for "${serverName}" to ${keysToUpdate.length} users`,
);
} else {
logger.debug(`[MCP][PrivateServers][Redis] No users have "${serverName}"`);
}
} catch (error) {
logger.error(`[MCP][PrivateServers][Redis] Error updating "${serverName}"`, error);
throw error;
}
}
public async addServerConfigIfCacheExists(
userIds: string[],
serverName: string,
config: ParsedServerConfig,
): Promise<void> {
if (!keyvRedisClient) return;
// Optimized: Single SCAN to get all users with initialized caches
const allUsersWithCaches = await this.getAllUserIds();
// Filter to only users with initialized caches
const eligibleUserIds = userIds.filter((id) => allUsersWithCaches.has(id));
if (eligibleUserIds.length === 0) {
logger.info(
`[MCP][PrivateServers][Redis] No initialized users to grant access to "${serverName}"`,
);
return;
}
// Batch add using pipeline with NX (only set if key doesn't exist)
const updatedConfig = { ...config, lastUpdatedAt: Date.now() };
const keyvFormat = { value: updatedConfig, expires: null };
const serializedConfig = JSON.stringify(keyvFormat);
const globalPrefix = cacheConfig.REDIS_KEY_PREFIX;
const separator = cacheConfig.GLOBAL_PREFIX_SEPARATOR;
const chunkSize = cacheConfig.REDIS_UPDATE_CHUNK_SIZE;
if (this.isClusterMode()) {
// Cluster mode: Use individual commands in parallel (no atomicity, but works across slots)
for (let i = 0; i < eligibleUserIds.length; i += chunkSize) {
const chunk = eligibleUserIds.slice(i, i + chunkSize);
await Promise.all(
chunk.map((userId) => {
const namespace = `${this.PREFIX}::${userId}`;
const fullKey = globalPrefix
? `${globalPrefix}${separator}${namespace}:${serverName}`
: `${namespace}:${serverName}`;
return keyvRedisClient!.set(fullKey, serializedConfig, { NX: true });
}),
);
}
} else {
// Single-node mode: Use multi() for atomic transactions
for (let i = 0; i < eligibleUserIds.length; i += chunkSize) {
const chunk = eligibleUserIds.slice(i, i + chunkSize);
const multi = keyvRedisClient.multi();
for (const userId of chunk) {
const namespace = `${this.PREFIX}::${userId}`;
const fullKey = globalPrefix
? `${globalPrefix}${separator}${namespace}:${serverName}`
: `${namespace}:${serverName}`;
multi.set(fullKey, serializedConfig, { NX: true });
}
await multi.exec();
}
}
logger.info(
`[MCP][PrivateServers][Redis] Granted access to "${serverName}" for ${eligibleUserIds.length}/${userIds.length} initialized users`,
);
}
public async removeServerConfigIfCacheExists(
userIds: string[],
serverName: string,
): Promise<void> {
if (!keyvRedisClient) return;
// Optimized: Direct key construction - no SCAN needed!
// Build full Redis keys directly since we know userId and serverName
const globalPrefix = cacheConfig.REDIS_KEY_PREFIX;
const separator = cacheConfig.GLOBAL_PREFIX_SEPARATOR;
const keysToDelete: string[] = [];
for (const userId of userIds) {
// Construct the full Redis key
const namespace = `${this.PREFIX}::${userId}`;
const fullKey = globalPrefix
? `${globalPrefix}${separator}${namespace}:${serverName}`
: `${namespace}:${serverName}`;
keysToDelete.push(fullKey);
}
if (keysToDelete.length > 0) {
// Use utility function for efficient parallel deletion
const removedCount = await batchDeleteKeys(keyvRedisClient, keysToDelete);
logger.info(
`[MCP][PrivateServers][Redis] Revoked access to "${serverName}" from ${removedCount}/${userIds.length} users`,
);
}
}
public async findUsersWithServer(serverName: string): Promise<string[]> {
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
return [];
}
const pattern = this.generateScanKeyPattern(serverName);
try {
const keys = await scanKeys(keyvRedisClient, pattern);
const userIds: string[] = [];
for (const key of keys) {
const userId = this.extractUserIdFromKey(key);
if (userId) {
userIds.push(userId);
}
}
return userIds;
} catch (error) {
logger.error(`[MCP][PrivateServers][Redis] Error finding users with "${serverName}"`, error);
return [];
}
}
/**
* Scans Redis to find all unique userIds that have private server configs.
* This method is used for efficient batch operations (add/update/delete) across all users.
*
* Performance note: This scans all private server config keys in Redis.
* Use sparingly as it can be expensive with many users.
*/
private async getAllUserIds(): Promise<Set<string>> {
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
logger.warn('[MCP][PrivateServerConfigs][Redis] Redis SCAN not available');
return new Set();
}
const userIds = new Set<string>();
// Pattern to match all private server configs: MCP::ServersRegistry::Servers::*:*
const pattern = `*${this.PREFIX}::*:*`;
try {
const keys = await scanKeys(keyvRedisClient, pattern);
for (const key of keys) {
const userId = this.extractUserIdFromKey(key);
if (userId) {
userIds.add(userId);
}
}
} catch (error) {
logger.error('[MCP][PrivateServerConfigs][Redis] Error scanning for userIds', error);
throw error;
}
return userIds;
}
/**
* Extract userId from a Redis key.
* Key format: MCP::ServersRegistry::Servers::userId:serverName
*/
private extractUserIdFromKey(key: string): string | null {
// Remove any global prefix, then extract userId
const keyWithoutGlobalPrefix = key.includes(this.PREFIX)
? key.substring(key.indexOf(this.PREFIX))
: key;
const withoutPrefix = keyWithoutGlobalPrefix.replace(`${this.PREFIX}::`, '');
const lastColonIndex = withoutPrefix.lastIndexOf(':');
if (lastColonIndex === -1) return null;
return withoutPrefix.substring(0, lastColonIndex);
}
/**
* Clear ALL servers from ALL user caches (nuclear option).
*/
public async resetAll(): Promise<void> {
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) return;
// Pattern to match all private user server configs
// Format: MCP::ServersRegistry::Servers::userId:serverName
const pattern = `*${this.PREFIX}::*:*`;
// Use utility functions for efficient scan and parallel deletion
const keysToDelete = await scanKeys(keyvRedisClient, pattern);
if (keysToDelete.length > 0) {
await batchDeleteKeys(keyvRedisClient, keysToDelete);
}
logger.info(`[MCP][Cache][Redis] Cleared all user caches: ${keysToDelete.length} entries`);
}
private generateScanKeyPattern(serverName: string): string {
return `*${this.PREFIX}::*:${serverName}`;
}
}