diff --git a/packages/api/src/cache/cacheConfig.ts b/packages/api/src/cache/cacheConfig.ts index 0d4304f5c3..7b4a899e98 100644 --- a/packages/api/src/cache/cacheConfig.ts +++ b/packages/api/src/cache/cacheConfig.ts @@ -128,8 +128,13 @@ const cacheConfig = { REDIS_SCAN_COUNT: math(process.env.REDIS_SCAN_COUNT, 1000), /** - * TTL in milliseconds for MCP registry read-through cache. - * This cache reduces redundant lookups within a single request flow. + * TTL in milliseconds for MCP registry caches. Used by both: + * - `MCPServersRegistry` read-through caches (`readThroughCache`/`readThroughCacheAll`) + * - `ServerConfigsCacheRedisAggregateKey` local snapshot (avoids redundant Redis GETs) + * + * Both layers use this value, so the effective max cross-instance staleness is up + * to 2× this value in multi-instance deployments. Set to 0 to disable the local + * snapshot entirely (every `getAll()` hits Redis directly). * @default 5000 (5 seconds) */ MCP_REGISTRY_CACHE_TTL: math(process.env.MCP_REGISTRY_CACHE_TTL, 5000), diff --git a/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts index 12f423a1fb..e67c1a4a84 100644 --- a/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts +++ b/packages/api/src/mcp/registry/cache/ServerConfigsCacheRedisAggregateKey.ts @@ -1,9 +1,8 @@ -import { logger } from '@librechat/data-schemas'; import type Keyv from 'keyv'; import type { IServerConfigsRepositoryInterface } from '~/mcp/registry/ServerConfigsRepositoryInterface'; import type { ParsedServerConfig, AddServerResult } from '~/mcp/types'; import { BaseRegistryCache } from './BaseRegistryCache'; -import { standardCache } from '~/cache'; +import { cacheConfig, standardCache } from '~/cache'; /** * Redis-backed MCP server configs cache that stores all entries under a single aggregate key. @@ -37,14 +36,38 @@ export class ServerConfigsCacheRedisAggregateKey protected readonly cache: Keyv; private writeLock: Promise = Promise.resolve(); + /** + * In-memory snapshot of the aggregate key to avoid redundant Redis GETs. + * `getAll()` is called 20+ times per chat request (once per tool, per server + * config lookup, per connection check) but the data doesn't change within a + * request cycle. The snapshot collapses all reads within the TTL window into + * a single Redis GET. Invalidated on every write (`add`, `update`, `remove`, `reset`). + * + * NOTE: In multi-instance deployments, the effective max staleness for cross-instance + * writes is up to 2×MCP_REGISTRY_CACHE_TTL. This happens when readThroughCacheAll + * (MCPServersRegistry) is populated from a snapshot that is nearly expired. For the + * default 5000ms TTL, worst-case cross-instance propagation is ~10s. This is acceptable + * given the single-writer invariant (leader-only initialization, rare manual reinspection). + */ + private localSnapshot: Record | null = null; + /** Milliseconds since epoch. 0 = epoch = always expired on first check. */ + private localSnapshotExpiry = 0; + constructor(namespace: string, leaderOnly: boolean) { super(leaderOnly); this.cache = standardCache(`${this.PREFIX}::Servers::${namespace}`); } + private invalidateLocalSnapshot(): void { + this.localSnapshot = null; + this.localSnapshotExpiry = 0; + } + /** * Serializes write operations to prevent concurrent read-modify-write races. * Reads (`get`, `getAll`) are not serialized — they can run concurrently. + * Always invalidates the local snapshot in `finally` to guarantee cleanup + * even when the write callback throws (e.g., Redis SET failure). */ private async withWriteLock(fn: () => Promise): Promise { const previousLock = this.writeLock; @@ -56,20 +79,29 @@ export class ServerConfigsCacheRedisAggregateKey await previousLock; return await fn(); } finally { + this.invalidateLocalSnapshot(); resolve(); } } public async getAll(): Promise> { - const startTime = Date.now(); - const result = (await this.cache.get(AGGREGATE_KEY)) as - | Record - | undefined; - const elapsed = Date.now() - startTime; - logger.debug( - `[ServerConfigsCacheRedisAggregateKey] getAll: fetched ${result ? Object.keys(result).length : 0} configs in ${elapsed}ms`, - ); - return result ?? {}; + const ttl = cacheConfig.MCP_REGISTRY_CACHE_TTL; + if (ttl > 0) { + const now = Date.now(); + if (this.localSnapshot !== null && now < this.localSnapshotExpiry) { + return this.localSnapshot; + } + } + + const result = + ((await this.cache.get(AGGREGATE_KEY)) as Record | undefined) ?? + {}; + + if (ttl > 0) { + this.localSnapshot = result; + this.localSnapshotExpiry = Date.now() + ttl; + } + return result; } public async get(serverName: string): Promise { @@ -80,6 +112,10 @@ export class ServerConfigsCacheRedisAggregateKey public async add(serverName: string, config: ParsedServerConfig): Promise { if (this.leaderOnly) await this.leaderCheck('add MCP servers'); return this.withWriteLock(async () => { + // Force fresh Redis read so the read-modify-write uses current data, + // not a snapshot that may predate this write. Distinct from the finally-block + // invalidation which cleans up after the write completes or throws. + this.invalidateLocalSnapshot(); const all = await this.getAll(); if (all[serverName]) { throw new Error( @@ -87,8 +123,8 @@ export class ServerConfigsCacheRedisAggregateKey ); } const storedConfig = { ...config, updatedAt: Date.now() }; - all[serverName] = storedConfig; - const success = await this.cache.set(AGGREGATE_KEY, all); + const newAll = { ...all, [serverName]: storedConfig }; + const success = await this.cache.set(AGGREGATE_KEY, newAll); this.successCheck(`add App server "${serverName}"`, success); return { serverName, config: storedConfig }; }); @@ -97,14 +133,15 @@ export class ServerConfigsCacheRedisAggregateKey public async update(serverName: string, config: ParsedServerConfig): Promise { if (this.leaderOnly) await this.leaderCheck('update MCP servers'); return this.withWriteLock(async () => { + this.invalidateLocalSnapshot(); // Force fresh Redis read (see add() comment) const all = await this.getAll(); if (!all[serverName]) { throw new Error( `Server "${serverName}" does not exist in cache. Use add() to create new configs.`, ); } - all[serverName] = { ...config, updatedAt: Date.now() }; - const success = await this.cache.set(AGGREGATE_KEY, all); + const newAll = { ...all, [serverName]: { ...config, updatedAt: Date.now() } }; + const success = await this.cache.set(AGGREGATE_KEY, newAll); this.successCheck(`update App server "${serverName}"`, success); }); } @@ -112,12 +149,13 @@ export class ServerConfigsCacheRedisAggregateKey public async remove(serverName: string): Promise { if (this.leaderOnly) await this.leaderCheck('remove MCP servers'); return this.withWriteLock(async () => { + this.invalidateLocalSnapshot(); // Force fresh Redis read (see add() comment) const all = await this.getAll(); if (!all[serverName]) { throw new Error(`Failed to remove server "${serverName}" in cache.`); } - delete all[serverName]; - const success = await this.cache.set(AGGREGATE_KEY, all); + const { [serverName]: _, ...newAll } = all; + const success = await this.cache.set(AGGREGATE_KEY, newAll); this.successCheck(`remove App server "${serverName}"`, success); }); } @@ -126,11 +164,16 @@ export class ServerConfigsCacheRedisAggregateKey * Resets the aggregate key directly instead of using SCAN-based `cache.clear()`. * Only one key (`__all__`) ever exists in this namespace, so a targeted delete is * more efficient and consistent with the PR's goal of eliminating SCAN operations. + * + * NOTE: Intentionally not serialized via `withWriteLock`. `reset()` is only called + * during lifecycle transitions (test teardown, full reinitialization via + * `MCPServersInitializer`) where no concurrent writes are in flight. */ public override async reset(): Promise { if (this.leaderOnly) { await this.leaderCheck('reset App MCP servers cache'); } await this.cache.delete(AGGREGATE_KEY); + this.invalidateLocalSnapshot(); } } diff --git a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts index cbb75609d1..5aeb49b206 100644 --- a/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts +++ b/packages/api/src/mcp/registry/cache/__tests__/ServerConfigsCacheRedisAggregateKey.cache_integration.spec.ts @@ -243,4 +243,96 @@ describe('ServerConfigsCacheRedisAggregateKey Integration Tests', () => { expect(Object.keys(result).length).toBe(0); }); }); + + describe('local snapshot behavior', () => { + it('should collapse repeated getAll calls into a single Redis GET within TTL', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + + // Prime the snapshot + await cache.getAll(); + + // Spy on the underlying Keyv cache to count Redis calls + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const cacheGetSpy = jest.spyOn((cache as any).cache, 'get'); + + await cache.getAll(); + await cache.getAll(); + await cache.getAll(); + + // Snapshot should be served; Redis should NOT have been called + expect(cacheGetSpy).not.toHaveBeenCalled(); + cacheGetSpy.mockRestore(); + }); + + it('should invalidate snapshot after add', async () => { + await cache.add('server1', mockConfig1); + const before = await cache.getAll(); + expect(Object.keys(before).length).toBe(1); + + await cache.add('server2', mockConfig2); + const after = await cache.getAll(); + expect(Object.keys(after).length).toBe(2); + }); + + it('should invalidate snapshot after update and preserve other entries', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + expect((await cache.getAll()).server1).toMatchObject(mockConfig1); + + await cache.update('server1', mockConfig3); + const after = await cache.getAll(); + expect(after.server1).toMatchObject(mockConfig3); + expect(after.server2).toMatchObject(mockConfig2); + }); + + it('should invalidate snapshot after remove', async () => { + await cache.add('server1', mockConfig1); + await cache.add('server2', mockConfig2); + expect(Object.keys(await cache.getAll()).length).toBe(2); + + await cache.remove('server1'); + const after = await cache.getAll(); + expect(Object.keys(after).length).toBe(1); + expect(after.server1).toBeUndefined(); + expect(after.server2).toMatchObject(mockConfig2); + }); + + it('should invalidate snapshot after reset', async () => { + await cache.add('server1', mockConfig1); + expect(Object.keys(await cache.getAll()).length).toBe(1); + + await cache.reset(); + expect(Object.keys(await cache.getAll()).length).toBe(0); + }); + + it('should not retroactively modify previously returned snapshot references', async () => { + await cache.add('server1', mockConfig1); + + // Prime the snapshot + const snapshot = await cache.getAll(); + expect(Object.keys(snapshot).length).toBe(1); + + // Add a second server — the original snapshot reference should be unmodified + await cache.add('server2', mockConfig2); + expect(Object.keys(snapshot).length).toBe(1); + expect(snapshot.server2).toBeUndefined(); + }); + + it('should hit Redis again after snapshot TTL expires', async () => { + await cache.add('server1', mockConfig1); + await cache.getAll(); // prime snapshot + + // Force-expire the snapshot without sleeping + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (cache as any).localSnapshotExpiry = Date.now() - 1; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const cacheGetSpy = jest.spyOn((cache as any).cache, 'get'); + const result = await cache.getAll(); + expect(cacheGetSpy).toHaveBeenCalledTimes(1); + expect(Object.keys(result).length).toBe(1); + cacheGetSpy.mockRestore(); + }); + }); });