diff --git a/packages/api/jest.config.mjs b/packages/api/jest.config.mjs index df9cf6bcc2..976b794122 100644 --- a/packages/api/jest.config.mjs +++ b/packages/api/jest.config.mjs @@ -8,6 +8,7 @@ export default { '\\.helper\\.ts$', '\\.helper\\.d\\.ts$', '/__tests__/helpers/', + '\\.manual\\.spec\\.[jt]sx?$', ], coverageReporters: ['text', 'cobertura'], testResultsProcessor: 'jest-junit', diff --git a/packages/api/package.json b/packages/api/package.json index a4e74a7a3c..f09d946ec5 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -18,8 +18,8 @@ "build:dev": "npm run clean && NODE_ENV=development rollup -c --bundleConfigAsCjs", "build:watch": "NODE_ENV=development rollup -c -w --bundleConfigAsCjs", "build:watch:prod": "rollup -c -w --bundleConfigAsCjs", - "test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.*integration\\.|\\.*helper\\.|__tests__/helpers/\"", - "test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.*integration\\.|\\.*helper\\.|__tests__/helpers/\"", + "test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.*integration\\.|\\.*helper\\.|__tests__/helpers/|\\.*manual\\.spec\\.\"", + "test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.*integration\\.|\\.*helper\\.|__tests__/helpers/|\\.*manual\\.spec\\.\"", "test:cache-integration:core": "jest --testPathPatterns=\"src/cache/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false", "test:cache-integration:cluster": "jest --testPathPatterns=\"src/cluster/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false --runInBand", "test:cache-integration:mcp": "jest --testPathPatterns=\"src/mcp/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false", diff --git a/packages/api/src/mcp/registry/MCPServersRegistry.ts b/packages/api/src/mcp/registry/MCPServersRegistry.ts index 506f5b1baa..b9c1eb66f5 100644 --- a/packages/api/src/mcp/registry/MCPServersRegistry.ts +++ b/packages/api/src/mcp/registry/MCPServersRegistry.ts @@ -2,8 +2,8 @@ import { Keyv } from 'keyv'; import { logger } from '@librechat/data-schemas'; import type { IServerConfigsRepositoryInterface } from './ServerConfigsRepositoryInterface'; import type * as t from '~/mcp/types'; +import { ServerConfigsCacheFactory, APP_CACHE_NAMESPACE } from './cache/ServerConfigsCacheFactory'; import { MCPInspectionFailedError, isMCPDomainNotAllowedError } from '~/mcp/errors'; -import { ServerConfigsCacheFactory } from './cache/ServerConfigsCacheFactory'; import { MCPServerInspector } from './MCPServerInspector'; import { ServerConfigsDB } from './db/ServerConfigsDB'; import { cacheConfig } from '~/cache/cacheConfig'; @@ -33,7 +33,7 @@ export class MCPServersRegistry { constructor(mongoose: typeof import('mongoose'), allowedDomains?: string[] | null) { this.dbConfigsRepo = new ServerConfigsDB(mongoose); - this.cacheConfigsRepo = ServerConfigsCacheFactory.create('App', false); + this.cacheConfigsRepo = ServerConfigsCacheFactory.create(APP_CACHE_NAMESPACE, false); this.allowedDomains = allowedDomains; const ttl = cacheConfig.MCP_REGISTRY_CACHE_TTL; diff --git a/packages/api/src/mcp/registry/cache/ServerConfigsCacheFactory.ts b/packages/api/src/mcp/registry/cache/ServerConfigsCacheFactory.ts index ba0cec90ea..b9549629d6 100644 --- a/packages/api/src/mcp/registry/cache/ServerConfigsCacheFactory.ts +++ b/packages/api/src/mcp/registry/cache/ServerConfigsCacheFactory.ts @@ -1,31 +1,51 @@ -import { cacheConfig } from '~/cache'; +import { ServerConfigsCacheRedisAggregateKey } from './ServerConfigsCacheRedisAggregateKey'; import { ServerConfigsCacheInMemory } from './ServerConfigsCacheInMemory'; import { ServerConfigsCacheRedis } from './ServerConfigsCacheRedis'; +import { cacheConfig } from '~/cache'; -export type ServerConfigsCache = ServerConfigsCacheInMemory | ServerConfigsCacheRedis; +export type ServerConfigsCache = + | ServerConfigsCacheInMemory + | ServerConfigsCacheRedis + | ServerConfigsCacheRedisAggregateKey; /** - * Factory for creating the appropriate ServerConfigsCache implementation based on deployment mode. - * Automatically selects between in-memory and Redis-backed storage depending on USE_REDIS config. - * In single-instance mode (USE_REDIS=false), returns lightweight in-memory cache. - * In cluster mode (USE_REDIS=true), returns Redis-backed cache with distributed coordination. - * Provides a unified interface regardless of the underlying storage mechanism. + * Namespace for YAML-loaded app-level MCP configs. When Redis is enabled, uses a single + * aggregate key instead of per-server keys to avoid the costly SCAN + batch-GET pattern + * in {@link ServerConfigsCacheRedis.getAll} that caused 60s+ stalls under concurrent + * load (see GitHub #11624, #12408). When Redis is disabled, uses in-memory storage. + */ +export const APP_CACHE_NAMESPACE = 'App' as const; + +/** + * Factory for creating the appropriate ServerConfigsCache implementation based on + * deployment mode and namespace. + * + * The {@link APP_CACHE_NAMESPACE} namespace uses {@link ServerConfigsCacheRedisAggregateKey} + * when Redis is enabled — storing all configs under a single key so `getAll()` is one GET + * instead of SCAN + N GETs. Cross-instance visibility is preserved: reinspection results + * propagate through Redis automatically. + * + * Other namespaces use the standard {@link ServerConfigsCacheRedis} (per-key storage with + * SCAN-based enumeration) when Redis is enabled. */ export class ServerConfigsCacheFactory { /** * Create a ServerConfigsCache instance. - * Returns Redis implementation if Redis is configured, otherwise in-memory implementation. * - * @param namespace - The namespace for the cache (e.g., 'App') - only used for Redis namespacing - * @param leaderOnly - Whether operations should only be performed by the leader (only applies to Redis) + * @param namespace - The namespace for the cache. {@link APP_CACHE_NAMESPACE} uses + * aggregate-key Redis storage (or in-memory when Redis is disabled). + * @param leaderOnly - Whether write operations should only be performed by the leader. * @returns ServerConfigsCache instance */ static create(namespace: string, leaderOnly: boolean): ServerConfigsCache { - if (cacheConfig.USE_REDIS) { - return new ServerConfigsCacheRedis(namespace, leaderOnly); + if (!cacheConfig.USE_REDIS) { + return new ServerConfigsCacheInMemory(); } - // In-memory mode uses a simple Map - doesn't need namespace - return new ServerConfigsCacheInMemory(); + if (namespace === APP_CACHE_NAMESPACE) { + return new ServerConfigsCacheRedisAggregateKey(namespace, leaderOnly); + } + + return new ServerConfigsCacheRedis(namespace, leaderOnly); } } diff --git a/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts new file mode 100644 index 0000000000..12f423a1fb --- /dev/null +++ b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts @@ -0,0 +1,136 @@ +import { logger } from '@librechat/data-schemas'; +import type Keyv from 'keyv'; +import type { IServerConfigsRepositoryInterface } from '~/mcp/registry/ServerConfigsRepositoryInterface'; +import type { ParsedServerConfig, AddServerResult } from '~/mcp/types'; +import { BaseRegistryCache } from './BaseRegistryCache'; +import { standardCache } from '~/cache'; + +/** + * Redis-backed MCP server configs cache that stores all entries under a single aggregate key. + * + * Unlike {@link ServerConfigsCacheRedis} which uses SCAN + batch-GET for `getAll()`, this + * implementation stores the entire config map as a single JSON value in Redis. This makes + * `getAll()` a single O(1) GET regardless of keyspace size, eliminating the 60s+ stalls + * caused by SCAN under concurrent load in large deployments (see GitHub #11624, #12408). + * + * Trade-offs: + * - `add/update/remove` use a serialized read-modify-write on the aggregate key via a + * promise-based mutex. This prevents concurrent writes from racing within a single + * process (e.g., during `Promise.allSettled` initialization of multiple servers). + * - The entire config map is serialized/deserialized on every operation. With typical MCP + * deployments (~5-50 servers), the JSON payload is small (10-50KB). + * - Cross-instance visibility is preserved: all instances read/write the same Redis key, + * so reinspection results propagate automatically after readThroughCache TTL expiry. + * + * IMPORTANT: The promise-based writeLock serializes writes within a single Node.js process + * only. Concurrent writes from separate instances race at the Redis level (last-write-wins). + * This is acceptable because writes are performed exclusively by the leader during + * initialization via {@link MCPServersInitializer}. `reinspectServer` is manual and rare. + * Callers must enforce this single-writer invariant externally. + */ +const AGGREGATE_KEY = '__all__'; + +export class ServerConfigsCacheRedisAggregateKey + extends BaseRegistryCache + implements IServerConfigsRepositoryInterface +{ + protected readonly cache: Keyv; + private writeLock: Promise = Promise.resolve(); + + constructor(namespace: string, leaderOnly: boolean) { + super(leaderOnly); + this.cache = standardCache(`${this.PREFIX}::Servers::${namespace}`); + } + + /** + * Serializes write operations to prevent concurrent read-modify-write races. + * Reads (`get`, `getAll`) are not serialized — they can run concurrently. + */ + private async withWriteLock(fn: () => Promise): Promise { + const previousLock = this.writeLock; + let resolve!: () => void; + this.writeLock = new Promise((r) => { + resolve = r; + }); + try { + await previousLock; + return await fn(); + } finally { + resolve(); + } + } + + public async getAll(): Promise> { + const startTime = Date.now(); + const result = (await this.cache.get(AGGREGATE_KEY)) as + | Record + | undefined; + const elapsed = Date.now() - startTime; + logger.debug( + `[ServerConfigsCacheRedisAggregateKey] getAll: fetched ${result ? Object.keys(result).length : 0} configs in ${elapsed}ms`, + ); + return result ?? {}; + } + + public async get(serverName: string): Promise { + const all = await this.getAll(); + return all[serverName]; + } + + public async add(serverName: string, config: ParsedServerConfig): Promise { + if (this.leaderOnly) await this.leaderCheck('add MCP servers'); + return this.withWriteLock(async () => { + const all = await this.getAll(); + if (all[serverName]) { + throw new Error( + `Server "${serverName}" already exists in cache. Use update() to modify existing configs.`, + ); + } + const storedConfig = { ...config, updatedAt: Date.now() }; + all[serverName] = storedConfig; + const success = await this.cache.set(AGGREGATE_KEY, all); + this.successCheck(`add App server "${serverName}"`, success); + return { serverName, config: storedConfig }; + }); + } + + public async update(serverName: string, config: ParsedServerConfig): Promise { + if (this.leaderOnly) await this.leaderCheck('update MCP servers'); + return this.withWriteLock(async () => { + const all = await this.getAll(); + if (!all[serverName]) { + throw new Error( + `Server "${serverName}" does not exist in cache. Use add() to create new configs.`, + ); + } + all[serverName] = { ...config, updatedAt: Date.now() }; + const success = await this.cache.set(AGGREGATE_KEY, all); + this.successCheck(`update App server "${serverName}"`, success); + }); + } + + public async remove(serverName: string): Promise { + if (this.leaderOnly) await this.leaderCheck('remove MCP servers'); + return this.withWriteLock(async () => { + const all = await this.getAll(); + if (!all[serverName]) { + throw new Error(`Failed to remove server "${serverName}" in cache.`); + } + delete all[serverName]; + const success = await this.cache.set(AGGREGATE_KEY, all); + this.successCheck(`remove App server "${serverName}"`, success); + }); + } + + /** + * Resets the aggregate key directly instead of using SCAN-based `cache.clear()`. + * Only one key (`__all__`) ever exists in this namespace, so a targeted delete is + * more efficient and consistent with the PR's goal of eliminating SCAN operations. + */ + public override async reset(): Promise { + if (this.leaderOnly) { + await this.leaderCheck('reset App MCP servers cache'); + } + await this.cache.delete(AGGREGATE_KEY); + } +} diff --git a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheFactory.test.ts b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheFactory.test.ts index 7499ae127e..577b878cc7 100644 --- a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheFactory.test.ts +++ b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheFactory.test.ts @@ -1,9 +1,11 @@ -import { ServerConfigsCacheFactory } from '../ServerConfigsCacheFactory'; +import { ServerConfigsCacheFactory, APP_CACHE_NAMESPACE } from '../ServerConfigsCacheFactory'; +import { ServerConfigsCacheRedisAggregateKey } from '../ServerConfigsCacheRedisAggregateKey'; import { ServerConfigsCacheInMemory } from '../ServerConfigsCacheInMemory'; import { ServerConfigsCacheRedis } from '../ServerConfigsCacheRedis'; import { cacheConfig } from '~/cache'; // Mock the cache implementations +jest.mock('../ServerConfigsCacheRedisAggregateKey'); jest.mock('../ServerConfigsCacheInMemory'); jest.mock('../ServerConfigsCacheRedis'); @@ -17,53 +19,48 @@ jest.mock('~/cache', () => ({ describe('ServerConfigsCacheFactory', () => { beforeEach(() => { jest.clearAllMocks(); + cacheConfig.USE_REDIS = false; }); describe('create()', () => { - it('should return ServerConfigsCacheRedis when USE_REDIS is true', () => { - // Arrange + it('should return ServerConfigsCacheRedisAggregateKey for App namespace when USE_REDIS is true', () => { cacheConfig.USE_REDIS = true; - // Act - const cache = ServerConfigsCacheFactory.create('App', true); + const cache = ServerConfigsCacheFactory.create(APP_CACHE_NAMESPACE, false); - // Assert - expect(cache).toBeInstanceOf(ServerConfigsCacheRedis); - expect(ServerConfigsCacheRedis).toHaveBeenCalledWith('App', true); + expect(cache).toBeInstanceOf(ServerConfigsCacheRedisAggregateKey); + expect(ServerConfigsCacheRedisAggregateKey).toHaveBeenCalledWith(APP_CACHE_NAMESPACE, false); + expect(ServerConfigsCacheRedis).not.toHaveBeenCalled(); + expect(ServerConfigsCacheInMemory).not.toHaveBeenCalled(); }); - it('should return ServerConfigsCacheInMemory when USE_REDIS is false', () => { - // Arrange + it('should return ServerConfigsCacheInMemory for App namespace when USE_REDIS is false', () => { cacheConfig.USE_REDIS = false; - // Act - const cache = ServerConfigsCacheFactory.create('App', false); + const cache = ServerConfigsCacheFactory.create(APP_CACHE_NAMESPACE, false); - // Assert expect(cache).toBeInstanceOf(ServerConfigsCacheInMemory); - expect(ServerConfigsCacheInMemory).toHaveBeenCalled(); + expect(ServerConfigsCacheInMemory).toHaveBeenCalledWith(); + expect(ServerConfigsCacheRedis).not.toHaveBeenCalled(); + expect(ServerConfigsCacheRedisAggregateKey).not.toHaveBeenCalled(); }); - it('should pass correct parameters to ServerConfigsCacheRedis', () => { - // Arrange + it('should return ServerConfigsCacheRedis for non-App namespaces when USE_REDIS is true', () => { cacheConfig.USE_REDIS = true; - // Act - ServerConfigsCacheFactory.create('CustomNamespace', true); + const cache = ServerConfigsCacheFactory.create('CustomNamespace', true); - // Assert + expect(cache).toBeInstanceOf(ServerConfigsCacheRedis); expect(ServerConfigsCacheRedis).toHaveBeenCalledWith('CustomNamespace', true); + expect(ServerConfigsCacheRedisAggregateKey).not.toHaveBeenCalled(); }); - it('should create ServerConfigsCacheInMemory without parameters when USE_REDIS is false', () => { - // Arrange + it('should return ServerConfigsCacheInMemory for non-App namespaces when USE_REDIS is false', () => { cacheConfig.USE_REDIS = false; - // Act - ServerConfigsCacheFactory.create('App', false); + const cache = ServerConfigsCacheFactory.create('CustomNamespace', false); - // Assert - // In-memory cache doesn't use namespace/leaderOnly parameters + expect(cache).toBeInstanceOf(ServerConfigsCacheInMemory); expect(ServerConfigsCacheInMemory).toHaveBeenCalledWith(); }); }); diff --git a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.perf_benchmark.manual.spec.ts b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.perf_benchmark.manual.spec.ts new file mode 100644 index 0000000000..1815d49fe0 --- /dev/null +++ b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedis.perf_benchmark.manual.spec.ts @@ -0,0 +1,336 @@ +/** + * Performance benchmark for ServerConfigsCacheRedis.getAll() + * + * Requires a live Redis instance. Run manually (excluded from CI): + * npx jest --config packages/api/jest.config.mjs --testPathPatterns="perf_benchmark" --coverage=false + * + * Set env vars as needed: + * USE_REDIS=true REDIS_URI=redis://localhost:6379 npx jest ... + * + * This benchmark isolates the two phases of getAll() — SCAN (key discovery) and + * batched GET (value retrieval) — to identify the actual bottleneck under load. + * It also benchmarks alternative approaches (single aggregate key, MGET) against + * the current SCAN+GET implementation. + */ +import { expect } from '@playwright/test'; +import type { ParsedServerConfig } from '~/mcp/types'; + +describe('ServerConfigsCacheRedis Performance Benchmark', () => { + let ServerConfigsCacheRedis: typeof import('../ServerConfigsCacheRedis').ServerConfigsCacheRedis; + let keyvRedisClient: Awaited['keyvRedisClient']; + let standardCache: Awaited['standardCache']; + + const PREFIX = 'perf-bench'; + + const makeConfig = (i: number): ParsedServerConfig => + ({ + type: 'stdio', + command: `cmd-${i}`, + args: [`arg-${i}`, `--flag-${i}`], + env: { KEY: `value-${i}`, EXTRA: `extra-${i}` }, + requiresOAuth: false, + tools: `tool_a_${i}, tool_b_${i}`, + capabilities: `{"tools":{"listChanged":true}}`, + serverInstructions: `Instructions for server ${i}`, + }) as ParsedServerConfig; + + beforeAll(async () => { + process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; + process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'true'; + process.env.REDIS_URI = + process.env.REDIS_URI ?? + 'redis://127.0.0.1:7001,redis://127.0.0.1:7002,redis://127.0.0.1:7003'; + process.env.REDIS_KEY_PREFIX = process.env.REDIS_KEY_PREFIX ?? 'perf-bench-test'; + + const cacheModule = await import('../ServerConfigsCacheRedis'); + const redisClients = await import('~/cache/redisClients'); + const cacheFactory = await import('~/cache'); + + ServerConfigsCacheRedis = cacheModule.ServerConfigsCacheRedis; + keyvRedisClient = redisClients.keyvRedisClient; + standardCache = cacheFactory.standardCache; + + if (!keyvRedisClient) throw new Error('Redis client is not initialized'); + await redisClients.keyvRedisClientReady; + }); + + afterAll(async () => { + if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect(); + }); + + /** Clean up all keys matching our test prefix */ + async function cleanupKeys(pattern: string): Promise { + if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) return; + const keys: string[] = []; + for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) { + keys.push(key); + } + if (keys.length > 0) { + await Promise.all(keys.map((key) => keyvRedisClient!.del(key))); + } + } + + /** Populate a cache with N configs and return the cache instance */ + async function populateCache( + namespace: string, + count: number, + ): Promise> { + const cache = new ServerConfigsCacheRedis(namespace, false); + for (let i = 0; i < count; i++) { + await cache.add(`server-${i}`, makeConfig(i)); + } + return cache; + } + + /** + * Benchmark 1: Isolate SCAN vs GET phases in current getAll() + * + * Measures time spent in each phase separately to identify the bottleneck. + */ + describe('Phase isolation: SCAN vs batched GET', () => { + const CONFIG_COUNTS = [5, 20, 50]; + + for (const count of CONFIG_COUNTS) { + it(`should measure SCAN and GET phases separately for ${count} configs`, async () => { + const ns = `${PREFIX}-phase-${count}`; + const cache = await populateCache(ns, count); + + try { + // Get the Keyv cache instance namespace for pattern matching + const keyvCache = standardCache(`MCP::ServersRegistry::Servers::${ns}`); + const pattern = `*MCP::ServersRegistry::Servers::${ns}:*`; + + // Phase 1: SCAN only (key discovery) + const scanStart = Date.now(); + const keys: string[] = []; + for await (const key of keyvRedisClient!.scanIterator({ MATCH: pattern })) { + keys.push(key); + } + const scanMs = Date.now() - scanStart; + + // Phase 2: Batched GET only (value retrieval via Keyv) + const keyNames = keys.map((key) => key.substring(key.lastIndexOf(':') + 1)); + const BATCH_SIZE = 100; + const getStart = Date.now(); + for (let i = 0; i < keyNames.length; i += BATCH_SIZE) { + const batch = keyNames.slice(i, i + BATCH_SIZE); + await Promise.all(batch.map((k) => keyvCache.get(k))); + } + const getMs = Date.now() - getStart; + + // Phase 3: Full getAll() (both phases combined) + const fullStart = Date.now(); + const result = await cache.getAll(); + const fullMs = Date.now() - fullStart; + + console.log( + `[${count} configs] SCAN: ${scanMs}ms | GET: ${getMs}ms | Full getAll: ${fullMs}ms | Keys found: ${keys.length}`, + ); + + expect(Object.keys(result).length).toBe(count); + + // Clean up the Keyv instance + await keyvCache.clear(); + } finally { + await cleanupKeys(`*${ns}*`); + } + }); + } + }); + + /** + * Benchmark 2: SCAN cost scales with total Redis keyspace, not just matching keys + * + * Redis SCAN iterates the entire hash table and filters by pattern. With a large + * keyspace (many non-matching keys), SCAN takes longer even if few keys match. + * This test measures SCAN time with background noise keys. + */ + describe('SCAN cost vs keyspace size', () => { + it('should measure SCAN latency with background noise keys', async () => { + const ns = `${PREFIX}-noise`; + const targetCount = 10; + + // Add target configs + const cache = await populateCache(ns, targetCount); + + // Add noise keys in a different namespace to inflate the keyspace + const noiseCount = 500; + const noiseCache = standardCache(`noise-namespace-${Date.now()}`); + for (let i = 0; i < noiseCount; i++) { + await noiseCache.set(`noise-${i}`, { data: `value-${i}` }); + } + + try { + const pattern = `*MCP::ServersRegistry::Servers::${ns}:*`; + + // Measure SCAN with noise + const scanStart = Date.now(); + const keys: string[] = []; + for await (const key of keyvRedisClient!.scanIterator({ MATCH: pattern })) { + keys.push(key); + } + const scanMs = Date.now() - scanStart; + + // Measure full getAll + const fullStart = Date.now(); + const result = await cache.getAll(); + const fullMs = Date.now() - fullStart; + + console.log( + `[${targetCount} configs + ${noiseCount} noise keys] SCAN: ${scanMs}ms | Full getAll: ${fullMs}ms`, + ); + + expect(Object.keys(result).length).toBe(targetCount); + } finally { + await noiseCache.clear(); + await cleanupKeys(`*${ns}*`); + } + }); + }); + + /** + * Benchmark 3: Concurrent getAll() calls (simulates the actual production bottleneck) + * + * Multiple users hitting /api/mcp/* simultaneously, all triggering getAll() + * after the 5s TTL read-through cache expires. + */ + describe('Concurrent getAll() under load', () => { + const CONCURRENCY_LEVELS = [1, 10, 50, 100]; + const CONFIG_COUNT = 30; + + for (const concurrency of CONCURRENCY_LEVELS) { + it(`should measure ${concurrency} concurrent getAll() calls with ${CONFIG_COUNT} configs`, async () => { + const ns = `${PREFIX}-concurrent-${concurrency}`; + const cache = await populateCache(ns, CONFIG_COUNT); + + try { + const startTime = Date.now(); + const promises = Array.from({ length: concurrency }, () => cache.getAll()); + const results = await Promise.all(promises); + const elapsed = Date.now() - startTime; + + console.log( + `[${CONFIG_COUNT} configs x ${concurrency} concurrent] Total: ${elapsed}ms | Per-call avg: ${(elapsed / concurrency).toFixed(1)}ms`, + ); + + for (const result of results) { + expect(Object.keys(result).length).toBe(CONFIG_COUNT); + } + } finally { + await cleanupKeys(`*${ns}*`); + } + }); + } + }); + + /** + * Benchmark 4: Alternative — Single aggregate key + * + * Instead of SCAN+GET, store all configs under one Redis key. + * getAll() becomes a single GET + JSON parse. + */ + describe('Alternative: Single aggregate key', () => { + it('should compare aggregate key vs SCAN+GET for getAll()', async () => { + const ns = `${PREFIX}-aggregate`; + const configCount = 30; + const cache = await populateCache(ns, configCount); + + // Build the aggregate object + const aggregate: Record = {}; + for (let i = 0; i < configCount; i++) { + aggregate[`server-${i}`] = makeConfig(i); + } + + // Store as single key + const aggregateCache = standardCache(`aggregate-test-${Date.now()}`); + await aggregateCache.set('all', aggregate); + + try { + // Measure SCAN+GET approach + const scanStart = Date.now(); + const scanResult = await cache.getAll(); + const scanMs = Date.now() - scanStart; + + // Measure single-key approach + const aggStart = Date.now(); + const aggResult = (await aggregateCache.get('all')) as Record; + const aggMs = Date.now() - aggStart; + + console.log( + `[${configCount} configs] SCAN+GET: ${scanMs}ms | Single key: ${aggMs}ms | Speedup: ${(scanMs / Math.max(aggMs, 1)).toFixed(1)}x`, + ); + + expect(Object.keys(scanResult).length).toBe(configCount); + expect(Object.keys(aggResult).length).toBe(configCount); + + // Concurrent comparison + const concurrency = 100; + const scanConcStart = Date.now(); + await Promise.all(Array.from({ length: concurrency }, () => cache.getAll())); + const scanConcMs = Date.now() - scanConcStart; + + const aggConcStart = Date.now(); + await Promise.all(Array.from({ length: concurrency }, () => aggregateCache.get('all'))); + const aggConcMs = Date.now() - aggConcStart; + + console.log( + `[${configCount} configs x ${concurrency} concurrent] SCAN+GET: ${scanConcMs}ms | Single key: ${aggConcMs}ms | Speedup: ${(scanConcMs / Math.max(aggConcMs, 1)).toFixed(1)}x`, + ); + } finally { + await aggregateCache.clear(); + await cleanupKeys(`*${ns}*`); + } + }); + }); + + /** + * Benchmark 5: Alternative — Raw MGET (bypassing Keyv serialization overhead) + * + * Keyv wraps each value in { value, expires } JSON. Using raw MGET on the + * Redis client skips the Keyv layer entirely. + */ + describe('Alternative: Raw MGET vs Keyv batch GET', () => { + it('should compare raw MGET vs Keyv GET for value retrieval', async () => { + const ns = `${PREFIX}-mget`; + const configCount = 30; + const cache = await populateCache(ns, configCount); + + try { + // First, discover keys via SCAN (same for both approaches) + const pattern = `*MCP::ServersRegistry::Servers::${ns}:*`; + const keys: string[] = []; + for await (const key of keyvRedisClient!.scanIterator({ MATCH: pattern })) { + keys.push(key); + } + + // Approach 1: Keyv batch GET (current implementation) + const keyvCache = standardCache(`MCP::ServersRegistry::Servers::${ns}`); + const keyNames = keys.map((key) => key.substring(key.lastIndexOf(':') + 1)); + + const keyvStart = Date.now(); + await Promise.all(keyNames.map((k) => keyvCache.get(k))); + const keyvMs = Date.now() - keyvStart; + + // Approach 2: Raw MGET (no Keyv overhead) + const mgetStart = Date.now(); + if ('mGet' in keyvRedisClient!) { + const rawValues = await ( + keyvRedisClient as { mGet: (keys: string[]) => Promise<(string | null)[]> } + ).mGet(keys); + // Parse the Keyv-wrapped JSON values + rawValues.filter(Boolean).map((v) => JSON.parse(v!)); + } + const mgetMs = Date.now() - mgetStart; + + console.log( + `[${configCount} configs] Keyv batch GET: ${keyvMs}ms | Raw MGET: ${mgetMs}ms | Speedup: ${(keyvMs / Math.max(mgetMs, 1)).toFixed(1)}x`, + ); + + // Clean up + await keyvCache.clear(); + } finally { + await cleanupKeys(`*${ns}*`); + } + }); + }); +}); diff --git a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts new file mode 100644 index 0000000000..cbb75609d1 --- /dev/null +++ b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts @@ -0,0 +1,246 @@ +import { expect } from '@playwright/test'; +import type { ParsedServerConfig } from '~/mcp/types'; + +describe('ServerConfigsCacheRedisAggregateKey Integration Tests', () => { + let ServerConfigsCacheRedisAggregateKey: typeof import('../ServerConfigsCacheRedisAggregateKey').ServerConfigsCacheRedisAggregateKey; + let keyvRedisClient: Awaited['keyvRedisClient']; + + let cache: InstanceType< + typeof import('../ServerConfigsCacheRedisAggregateKey').ServerConfigsCacheRedisAggregateKey + >; + + const mockConfig1 = { + type: 'stdio', + command: 'node', + args: ['server1.js'], + env: { TEST: 'value1' }, + } as ParsedServerConfig; + + const mockConfig2 = { + type: 'stdio', + command: 'python', + args: ['server2.py'], + env: { TEST: 'value2' }, + } as ParsedServerConfig; + + const mockConfig3 = { + type: 'sse', + url: 'http://localhost:3000', + requiresOAuth: true, + } as ParsedServerConfig; + + beforeAll(async () => { + process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; + process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'true'; + process.env.REDIS_URI = + process.env.REDIS_URI ?? + 'redis://127.0.0.1:7001,redis://127.0.0.1:7002,redis://127.0.0.1:7003'; + process.env.REDIS_KEY_PREFIX = process.env.REDIS_KEY_PREFIX ?? 'AggregateKey-IntegrationTest'; + + const cacheModule = await import('../ServerConfigsCacheRedisAggregateKey'); + const redisClients = await import('~/cache/redisClients'); + + ServerConfigsCacheRedisAggregateKey = cacheModule.ServerConfigsCacheRedisAggregateKey; + keyvRedisClient = redisClients.keyvRedisClient; + + if (!keyvRedisClient) throw new Error('Redis client is not initialized'); + await redisClients.keyvRedisClientReady; + }); + + beforeEach(() => { + cache = new ServerConfigsCacheRedisAggregateKey('agg-test', false); + }); + + afterEach(async () => { + await cache.reset(); + }); + + afterAll(async () => { + if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect(); + }); + + describe('add and get operations', () => { + it('should add and retrieve a server config', async () => { + await cache.add('server1', mockConfig1); + const result = await cache.get('server1'); + expect(result).toMatchObject(mockConfig1); + }); + + it('should return undefined for non-existent server', async () => { + const result = await cache.get('non-existent'); + expect(result).toBeUndefined(); + }); + + it('should throw error when adding duplicate server', async () => { + await cache.add('server1', mockConfig1); + await expect(cache.add('server1', mockConfig2)).rejects.toThrow( + 'Server "server1" already exists in cache. Use update() to modify existing configs.', + ); + }); + + it('should handle multiple server configs', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + await cache.add('server3', mockConfig3); + + expect(await cache.get('server1')).toMatchObject(mockConfig1); + expect(await cache.get('server2')).toMatchObject(mockConfig2); + expect(await cache.get('server3')).toMatchObject(mockConfig3); + }); + }); + + describe('getAll operation', () => { + it('should return empty object when no servers exist', async () => { + const result = await cache.getAll(); + expect(result).toMatchObject({}); + }); + + it('should return all server configs', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + await cache.add('server3', mockConfig3); + + const result = await cache.getAll(); + expect(result).toMatchObject({ + server1: mockConfig1, + server2: mockConfig2, + server3: mockConfig3, + }); + }); + + it('should reflect additions in getAll', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + + let result = await cache.getAll(); + expect(Object.keys(result).length).toBe(2); + + await cache.add('server3', mockConfig3); + result = await cache.getAll(); + expect(Object.keys(result).length).toBe(3); + expect(result.server3).toMatchObject(mockConfig3); + }); + }); + + describe('update operation', () => { + it('should update an existing server config', async () => { + await cache.add('server1', mockConfig1); + expect(await cache.get('server1')).toMatchObject(mockConfig1); + + await cache.update('server1', mockConfig2); + const result = await cache.get('server1'); + expect(result).toMatchObject(mockConfig2); + }); + + it('should throw error when updating non-existent server', async () => { + await expect(cache.update('non-existent', mockConfig1)).rejects.toThrow( + 'Server "non-existent" does not exist in cache. Use add() to create new configs.', + ); + }); + + it('should reflect updates in getAll', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + + await cache.update('server1', mockConfig3); + const result = await cache.getAll(); + expect(result.server1).toMatchObject(mockConfig3); + expect(result.server2).toMatchObject(mockConfig2); + }); + }); + + describe('remove operation', () => { + it('should remove an existing server config', async () => { + await cache.add('server1', mockConfig1); + expect(await cache.get('server1')).toMatchObject(mockConfig1); + + await cache.remove('server1'); + expect(await cache.get('server1')).toBeUndefined(); + }); + + it('should throw error when removing non-existent server', async () => { + await expect(cache.remove('non-existent')).rejects.toThrow( + 'Failed to remove server "non-existent" in cache.', + ); + }); + + it('should remove server from getAll results', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + + let result = await cache.getAll(); + expect(Object.keys(result).length).toBe(2); + + await cache.remove('server1'); + result = await cache.getAll(); + expect(Object.keys(result).length).toBe(1); + expect(result.server1).toBeUndefined(); + expect(result.server2).toMatchObject(mockConfig2); + }); + + it('should allow re-adding a removed server', async () => { + await cache.add('server1', mockConfig1); + await cache.remove('server1'); + await cache.add('server1', mockConfig3); + + const result = await cache.get('server1'); + expect(result).toMatchObject(mockConfig3); + }); + }); + + describe('concurrent write safety', () => { + it('should handle concurrent add calls without data loss', async () => { + const configCount = 20; + const promises = Array.from({ length: configCount }, (_, i) => + cache.add(`server-${i}`, { + type: 'stdio', + command: `cmd-${i}`, + args: [`arg-${i}`], + } as ParsedServerConfig), + ); + + const results = await Promise.allSettled(promises); + const failures = results.filter((r) => r.status === 'rejected'); + expect(failures).toHaveLength(0); + + const result = await cache.getAll(); + expect(Object.keys(result).length).toBe(configCount); + for (let i = 0; i < configCount; i++) { + expect(result[`server-${i}`]).toBeDefined(); + const config = result[`server-${i}`] as { command?: string }; + expect(config.command).toBe(`cmd-${i}`); + } + }); + + it('should handle concurrent getAll calls', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + await cache.add('server3', mockConfig3); + + const concurrency = 50; + const promises = Array.from({ length: concurrency }, () => cache.getAll()); + const results = await Promise.all(promises); + + for (const result of results) { + expect(Object.keys(result).length).toBe(3); + expect(result.server1).toMatchObject(mockConfig1); + expect(result.server2).toMatchObject(mockConfig2); + expect(result.server3).toMatchObject(mockConfig3); + } + }); + }); + + describe('reset operation', () => { + it('should clear all configs', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + + expect(Object.keys(await cache.getAll()).length).toBe(2); + + await cache.reset(); + + const result = await cache.getAll(); + expect(Object.keys(result).length).toBe(0); + }); + }); +});