diff --git a/packages/api/src/mcp/registry/MCPServersRegistry.ts b/packages/api/src/mcp/registry/MCPServersRegistry.ts index 54b62c3ff9..801b3957a0 100644 --- a/packages/api/src/mcp/registry/MCPServersRegistry.ts +++ b/packages/api/src/mcp/registry/MCPServersRegistry.ts @@ -26,6 +26,10 @@ export class MCPServersRegistry { private readonly allowedDomains?: string[] | null; private readonly readThroughCache: Keyv; private readonly readThroughCacheAll: Keyv>; + private readonly pendingGetAllPromises = new Map< + string, + Promise> + >(); constructor(mongoose: typeof import('mongoose'), allowedDomains?: string[] | null) { this.dbConfigsRepo = new ServerConfigsDB(mongoose); @@ -99,11 +103,29 @@ export class MCPServersRegistry { public async getAllServerConfigs(userId?: string): Promise> { const cacheKey = userId ?? '__no_user__'; - // Check if key exists in read-through cache if (await this.readThroughCacheAll.has(cacheKey)) { return (await this.readThroughCacheAll.get(cacheKey)) ?? {}; } + const pending = this.pendingGetAllPromises.get(cacheKey); + if (pending) { + return pending; + } + + const fetchPromise = this.fetchAllServerConfigs(cacheKey, userId); + this.pendingGetAllPromises.set(cacheKey, fetchPromise); + + try { + return await fetchPromise; + } finally { + this.pendingGetAllPromises.delete(cacheKey); + } + } + + private async fetchAllServerConfigs( + cacheKey: string, + userId?: string, + ): Promise> { const result = { ...(await this.cacheConfigsRepo.getAll()), ...(await this.dbConfigsRepo.getAll(userId)), diff --git a/packages/api/src/mcp/registry/__tests__/MCPServersRegistry.cache_integration.spec.ts b/packages/api/src/mcp/registry/__tests__/MCPServersRegistry.cache_integration.spec.ts index d20092c962..7752ff57d2 100644 --- a/packages/api/src/mcp/registry/__tests__/MCPServersRegistry.cache_integration.spec.ts +++ b/packages/api/src/mcp/registry/__tests__/MCPServersRegistry.cache_integration.spec.ts @@ -236,4 +236,106 @@ describe('MCPServersRegistry Redis Integration Tests', () => { expect(Object.keys(configsAfter)).toHaveLength(0); }); }); + + describe('single-flight deduplication', () => { + it('should deduplicate concurrent getAllServerConfigs calls', async () => { + await registry.addServer('server1', testRawConfig, 'CACHE'); + await registry.addServer('server2', testRawConfig, 'CACHE'); + await registry.addServer('server3', testRawConfig, 'CACHE'); + + await registry['readThroughCacheAll'].clear(); + + const cacheRepoGetAllSpy = jest.spyOn(registry['cacheConfigsRepo'], 'getAll'); + + const concurrentCalls = 10; + const promises = Array.from({ length: concurrentCalls }, () => + registry.getAllServerConfigs(), + ); + + const results = await Promise.all(promises); + + expect(cacheRepoGetAllSpy.mock.calls.length).toBe(1); + + for (const result of results) { + expect(Object.keys(result).length).toBe(3); + expect(result).toHaveProperty('server1'); + expect(result).toHaveProperty('server2'); + expect(result).toHaveProperty('server3'); + } + }); + + it('should handle different userIds independently', async () => { + await registry.addServer('shared_server', testRawConfig, 'CACHE'); + + await registry['readThroughCacheAll'].clear(); + + const cacheRepoGetAllSpy = jest.spyOn(registry['cacheConfigsRepo'], 'getAll'); + + const [result1, result2, result3] = await Promise.all([ + registry.getAllServerConfigs('user1'), + registry.getAllServerConfigs('user2'), + registry.getAllServerConfigs('user1'), + ]); + + expect(cacheRepoGetAllSpy.mock.calls.length).toBe(2); + + expect(Object.keys(result1)).toContain('shared_server'); + expect(Object.keys(result2)).toContain('shared_server'); + expect(Object.keys(result3)).toContain('shared_server'); + }); + + it('should complete concurrent requests without timeout', async () => { + for (let i = 0; i < 10; i++) { + await registry.addServer(`stress_server_${i}`, testRawConfig, 'CACHE'); + } + + await registry['readThroughCacheAll'].clear(); + + const concurrentCalls = 50; + const startTime = Date.now(); + + const promises = Array.from({ length: concurrentCalls }, () => + registry.getAllServerConfigs(), + ); + + const results = await Promise.all(promises); + const elapsed = Date.now() - startTime; + + expect(elapsed).toBeLessThan(10000); + + for (const result of results) { + expect(Object.keys(result).length).toBe(10); + } + }); + + it('should return consistent results across all concurrent callers', async () => { + await registry.addServer('consistency_server_a', testRawConfig, 'CACHE'); + await registry.addServer( + 'consistency_server_b', + { + ...testRawConfig, + command: 'python', + }, + 'CACHE', + ); + + await registry['readThroughCacheAll'].clear(); + + const results = await Promise.all([ + registry.getAllServerConfigs(), + registry.getAllServerConfigs(), + registry.getAllServerConfigs(), + registry.getAllServerConfigs(), + registry.getAllServerConfigs(), + ]); + + const firstResult = results[0]; + for (const result of results) { + expect(Object.keys(result).sort()).toEqual(Object.keys(firstResult).sort()); + for (const key of Object.keys(firstResult)) { + expect(result[key]).toMatchObject(firstResult[key]); + } + } + }); + }); }); diff --git a/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedis.ts b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedis.ts index 4532afa251..d3154baf73 100644 --- a/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedis.ts +++ b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedis.ts @@ -1,9 +1,10 @@ import type Keyv from 'keyv'; import { fromPairs } from 'lodash'; +import { logger } from '@librechat/data-schemas'; +import type { IServerConfigsRepositoryInterface } from '~/mcp/registry/ServerConfigsRepositoryInterface'; +import type { ParsedServerConfig, AddServerResult } from '~/mcp/types'; import { standardCache, keyvRedisClient } from '~/cache'; -import { ParsedServerConfig, AddServerResult } from '~/mcp/types'; import { BaseRegistryCache } from './BaseRegistryCache'; -import { IServerConfigsRepositoryInterface } from '../ServerConfigsRepositoryInterface'; /** * Redis-backed implementation of MCP server configurations cache for distributed deployments. @@ -12,6 +13,8 @@ import { IServerConfigsRepositoryInterface } from '../ServerConfigsRepositoryInt * Supports optional leader-only write operations to prevent race conditions during initialization. * Data persists across server restarts and is accessible from any instance in the cluster. */ +const BATCH_SIZE = 100; + export class ServerConfigsCacheRedis extends BaseRegistryCache implements IServerConfigsRepositoryInterface @@ -60,27 +63,50 @@ export class ServerConfigsCacheRedis } public async getAll(): Promise> { - // Use Redis SCAN iterator directly (non-blocking, production-ready) - // Note: Keyv uses a single colon ':' between namespace and key, even if GLOBAL_PREFIX_SEPARATOR is '::' - const pattern = `*${this.cache.namespace}:*`; - const entries: Array<[string, ParsedServerConfig]> = []; - - // Use scanIterator from Redis client - if (keyvRedisClient && 'scanIterator' in keyvRedisClient) { - for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) { - // Extract the actual key name (last part after final colon) - // Full key format: "prefix::namespace:keyName" - const lastColonIndex = key.lastIndexOf(':'); - const keyName = key.substring(lastColonIndex + 1); - const config = (await this.cache.get(keyName)) as ParsedServerConfig | undefined; - if (config) { - entries.push([keyName, config]); - } - } - } else { + if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) { throw new Error('Redis client with scanIterator not available.'); } + const startTime = Date.now(); + const pattern = `*${this.cache.namespace}:*`; + + const keys: string[] = []; + for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) { + keys.push(key); + } + + if (keys.length === 0) { + logger.debug(`[ServerConfigsCacheRedis] getAll(${this.namespace}): no keys found`); + return {}; + } + + /** Extract keyName from full Redis key format: "prefix::namespace:keyName" */ + const keyNames = keys.map((key) => key.substring(key.lastIndexOf(':') + 1)); + + const entries: Array<[string, ParsedServerConfig]> = []; + + for (let i = 0; i < keyNames.length; i += BATCH_SIZE) { + const batchEnd = Math.min(i + BATCH_SIZE, keyNames.length); + const promises: Promise[] = []; + + for (let j = i; j < batchEnd; j++) { + promises.push(this.cache.get(keyNames[j])); + } + + const configs = await Promise.all(promises); + + for (let j = 0; j < configs.length; j++) { + if (configs[j]) { + entries.push([keyNames[i + j], configs[j]!]); + } + } + } + + const elapsed = Date.now() - startTime; + logger.debug( + `[ServerConfigsCacheRedis] getAll(${this.namespace}): fetched ${entries.length} configs in ${elapsed}ms`, + ); + return fromPairs(entries); } } diff --git a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.cache_integration.spec.ts b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.cache_integration.spec.ts index d5a7540296..4e563ab4aa 100644 --- a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.cache_integration.spec.ts +++ b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.cache_integration.spec.ts @@ -7,25 +7,25 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { let cache: InstanceType; - // Test data - const mockConfig1: ParsedServerConfig = { + const mockConfig1 = { + type: 'stdio', command: 'node', args: ['server1.js'], env: { TEST: 'value1' }, - }; + } as ParsedServerConfig; - const mockConfig2: ParsedServerConfig = { + const mockConfig2 = { + type: 'stdio', command: 'python', args: ['server2.py'], env: { TEST: 'value2' }, - }; + } as ParsedServerConfig; - const mockConfig3: ParsedServerConfig = { - command: 'node', - args: ['server3.js'], + const mockConfig3 = { + type: 'sse', url: 'http://localhost:3000', requiresOAuth: true, - }; + } as ParsedServerConfig; beforeAll(async () => { // Set up environment variables for Redis (only if not already set) @@ -52,9 +52,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { }); beforeEach(() => { - // Create a fresh instance for each test with leaderOnly=true jest.resetModules(); - cache = new ServerConfigsCacheRedis('test-user', 'Shared', false); + cache = new ServerConfigsCacheRedis('test-user', false); }); afterEach(async () => { @@ -114,8 +113,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { }); it('should isolate caches by owner namespace', async () => { - const userCache = new ServerConfigsCacheRedis('user1', 'Private', false); - const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false); + const userCache = new ServerConfigsCacheRedis('user1-private', false); + const globalCache = new ServerConfigsCacheRedis('global-shared', false); await userCache.add('server1', mockConfig1); await globalCache.add('server1', mockConfig2); @@ -161,8 +160,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { }); it('should only return configs for the specific owner', async () => { - const userCache = new ServerConfigsCacheRedis('user1', 'Private', false); - const globalCache = new ServerConfigsCacheRedis('global', 'Private', false); + const userCache = new ServerConfigsCacheRedis('user1-owner', false); + const globalCache = new ServerConfigsCacheRedis('global-owner', false); await userCache.add('server1', mockConfig1); await userCache.add('server2', mockConfig2); @@ -206,8 +205,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { }); it('should only update in the specific owner namespace', async () => { - const userCache = new ServerConfigsCacheRedis('user1', 'Private', false); - const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false); + const userCache = new ServerConfigsCacheRedis('user1-update', false); + const globalCache = new ServerConfigsCacheRedis('global-update', false); await userCache.add('server1', mockConfig1); await globalCache.add('server1', mockConfig2); @@ -258,8 +257,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { }); it('should only remove from the specific owner namespace', async () => { - const userCache = new ServerConfigsCacheRedis('user1', 'Private', false); - const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false); + const userCache = new ServerConfigsCacheRedis('user1-remove', false); + const globalCache = new ServerConfigsCacheRedis('global-remove', false); await userCache.add('server1', mockConfig1); await globalCache.add('server1', mockConfig2); @@ -270,4 +269,125 @@ describe('ServerConfigsCacheRedis Integration Tests', () => { expect(await globalCache.get('server1')).toMatchObject(mockConfig2); }); }); + + describe('getAll parallel fetching', () => { + it('should handle many configs efficiently with parallel fetching', async () => { + const testCache = new ServerConfigsCacheRedis('parallel-test', false); + const configCount = 20; + + for (let i = 0; i < configCount; i++) { + await testCache.add(`server-${i}`, { + type: 'stdio', + command: `cmd-${i}`, + args: [`arg-${i}`], + } as ParsedServerConfig); + } + + const startTime = Date.now(); + const result = await testCache.getAll(); + const elapsed = Date.now() - startTime; + + expect(Object.keys(result).length).toBe(configCount); + for (let i = 0; i < configCount; i++) { + expect(result[`server-${i}`]).toBeDefined(); + const config = result[`server-${i}`] as { command?: string }; + expect(config.command).toBe(`cmd-${i}`); + } + + expect(elapsed).toBeLessThan(5000); + }); + + it('should handle concurrent getAll calls without timeout', async () => { + const testCache = new ServerConfigsCacheRedis('concurrent-test', false); + + for (let i = 0; i < 10; i++) { + await testCache.add(`server-${i}`, { + type: 'stdio', + command: `cmd-${i}`, + args: [`arg-${i}`], + } as ParsedServerConfig); + } + + const concurrentCalls = 50; + const startTime = Date.now(); + const promises = Array.from({ length: concurrentCalls }, () => testCache.getAll()); + + const results = await Promise.all(promises); + const elapsed = Date.now() - startTime; + + for (const result of results) { + expect(Object.keys(result).length).toBe(10); + } + + expect(elapsed).toBeLessThan(10000); + }); + + it('should return consistent results across concurrent calls', async () => { + const testCache = new ServerConfigsCacheRedis('consistency-test', false); + + await testCache.add('server-a', mockConfig1); + await testCache.add('server-b', mockConfig2); + await testCache.add('server-c', mockConfig3); + + const results = await Promise.all([ + testCache.getAll(), + testCache.getAll(), + testCache.getAll(), + testCache.getAll(), + testCache.getAll(), + ]); + + const firstResult = results[0]; + for (const result of results) { + expect(Object.keys(result).sort()).toEqual(Object.keys(firstResult).sort()); + expect(result['server-a']).toMatchObject(mockConfig1); + expect(result['server-b']).toMatchObject(mockConfig2); + expect(result['server-c']).toMatchObject(mockConfig3); + } + }); + + /** + * Performance regression test for N+1 Redis fix. + * + * Before fix: getAll() used sequential GET calls inside an async loop: + * for await (key of scan) { await cache.get(key); } // N sequential calls + * + * With 30 configs and 100 concurrent requests, this would cause: + * - 100 × 30 = 3000 sequential Redis roundtrips + * - Under load, requests would queue and timeout at 60s + * + * After fix: getAll() uses Promise.all for parallel fetching: + * Promise.all(keys.map(k => cache.get(k))); // N parallel calls + * + * This test validates the fix by ensuring 100 concurrent requests + * complete in under 5 seconds - impossible with the old N+1 pattern. + */ + it('should complete 100 concurrent requests in under 5s (regression test for N+1 fix)', async () => { + const testCache = new ServerConfigsCacheRedis('perf-regression-test', false); + const configCount = 30; + + for (let i = 0; i < configCount; i++) { + await testCache.add(`server-${i}`, { + type: 'stdio', + command: `cmd-${i}`, + args: [`arg-${i}`], + } as ParsedServerConfig); + } + + const concurrentRequests = 100; + const maxAllowedMs = 5000; + + const startTime = Date.now(); + const promises = Array.from({ length: concurrentRequests }, () => testCache.getAll()); + const results = await Promise.all(promises); + const elapsed = Date.now() - startTime; + + expect(results.length).toBe(concurrentRequests); + for (const result of results) { + expect(Object.keys(result).length).toBe(configCount); + } + + expect(elapsed).toBeLessThan(maxAllowedMs); + }); + }); });