mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-04-03 22:37:20 +02:00
🌊 refactor: Local Snapshot for Aggregate Key Cache to Avoid Redundant Redis GETs (#12422)
* perf: Add local snapshot to aggregate key cache to avoid redundant Redis GETs getAll() was being called 20+ times per chat request (once per tool, per server config lookup, per connection check). Each call hit Redis even though the data doesn't change within a request cycle. Add an in-memory snapshot with 5s TTL that collapses all reads within the window into a single Redis GET. Writes (add/update/remove/reset) invalidate the snapshot immediately so mutations are never stale. Also removes the debug logger that was producing noisy per-call logs. * fix: Prevent snapshot mutation and guarantee cleanup on write failure - Never mutate the snapshot object in-place during writes. Build a new object (spread) so concurrent readers never observe uncommitted state. - Move invalidateLocalSnapshot() into withWriteLock's finally block so cleanup is guaranteed even when successCheck throws on Redis failure. - After successful writes, populate the snapshot with the committed state to avoid an unnecessary Redis GET on the next read. - Use Date.now() after the await in getAll() so the TTL window isn't shortened by Redis latency. - Strengthen tests: spy on underlying Keyv cache to verify N getAll() calls collapse into 1 Redis GET, verify snapshot reference immutability. * fix: Remove dead populateLocalSnapshot calls from write callbacks populateLocalSnapshot was called inside withWriteLock callbacks, but the finally block in withWriteLock always calls invalidateLocalSnapshot immediately after — undoing the populate on every execution path. Remove the dead method and its three call sites. The snapshot is correctly cleared by finally on both success and failure paths. The next getAll() after a write hits Redis once to fetch the committed state, which is acceptable since writes only occur during init and rare manual reinspection. * fix: Derive local snapshot TTL from MCP_REGISTRY_CACHE_TTL config Use cacheConfig.MCP_REGISTRY_CACHE_TTL (default 5000ms) instead of a hardcoded 5s constant. When TTL is 0 (operator explicitly wants no caching), the snapshot is disabled entirely — every getAll() hits Redis. * fix: Add TTL expiry test, document 2×TTL staleness, clarify comments - Add missing test for snapshot TTL expiry path (force-expire via localSnapshotExpiry mutation, verify Redis is hit again) - Document 2×TTL max cross-instance staleness in localSnapshot JSDoc - Document reset() intentionally bypasses withWriteLock - Add inline comments explaining why early invalidateLocalSnapshot() in write callbacks is distinct from the finally-block cleanup - Update cacheConfig.MCP_REGISTRY_CACHE_TTL JSDoc to reflect both use sites and the staleness implication - Rename misleading test name for snapshot reference immutability - Add epoch sentinel comment on localSnapshotExpiry initialization
This commit is contained in:
parent
8e2721011e
commit
5e3b7bcde3
3 changed files with 159 additions and 19 deletions
9
packages/api/src/cache/cacheConfig.ts
vendored
9
packages/api/src/cache/cacheConfig.ts
vendored
|
|
@ -128,8 +128,13 @@ const cacheConfig = {
|
|||
REDIS_SCAN_COUNT: math(process.env.REDIS_SCAN_COUNT, 1000),
|
||||
|
||||
/**
|
||||
* TTL in milliseconds for MCP registry read-through cache.
|
||||
* This cache reduces redundant lookups within a single request flow.
|
||||
* TTL in milliseconds for MCP registry caches. Used by both:
|
||||
* - `MCPServersRegistry` read-through caches (`readThroughCache`/`readThroughCacheAll`)
|
||||
* - `ServerConfigsCacheRedisAggregateKey` local snapshot (avoids redundant Redis GETs)
|
||||
*
|
||||
* Both layers use this value, so the effective max cross-instance staleness is up
|
||||
* to 2× this value in multi-instance deployments. Set to 0 to disable the local
|
||||
* snapshot entirely (every `getAll()` hits Redis directly).
|
||||
* @default 5000 (5 seconds)
|
||||
*/
|
||||
MCP_REGISTRY_CACHE_TTL: math(process.env.MCP_REGISTRY_CACHE_TTL, 5000),
|
||||
|
|
|
|||
|
|
@ -1,9 +1,8 @@
|
|||
import { logger } from '@librechat/data-schemas';
|
||||
import type Keyv from 'keyv';
|
||||
import type { IServerConfigsRepositoryInterface } from '~/mcp/registry/ServerConfigsRepositoryInterface';
|
||||
import type { ParsedServerConfig, AddServerResult } from '~/mcp/types';
|
||||
import { BaseRegistryCache } from './BaseRegistryCache';
|
||||
import { standardCache } from '~/cache';
|
||||
import { cacheConfig, standardCache } from '~/cache';
|
||||
|
||||
/**
|
||||
* Redis-backed MCP server configs cache that stores all entries under a single aggregate key.
|
||||
|
|
@ -37,14 +36,38 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
protected readonly cache: Keyv;
|
||||
private writeLock: Promise<void> = Promise.resolve();
|
||||
|
||||
/**
|
||||
* In-memory snapshot of the aggregate key to avoid redundant Redis GETs.
|
||||
* `getAll()` is called 20+ times per chat request (once per tool, per server
|
||||
* config lookup, per connection check) but the data doesn't change within a
|
||||
* request cycle. The snapshot collapses all reads within the TTL window into
|
||||
* a single Redis GET. Invalidated on every write (`add`, `update`, `remove`, `reset`).
|
||||
*
|
||||
* NOTE: In multi-instance deployments, the effective max staleness for cross-instance
|
||||
* writes is up to 2×MCP_REGISTRY_CACHE_TTL. This happens when readThroughCacheAll
|
||||
* (MCPServersRegistry) is populated from a snapshot that is nearly expired. For the
|
||||
* default 5000ms TTL, worst-case cross-instance propagation is ~10s. This is acceptable
|
||||
* given the single-writer invariant (leader-only initialization, rare manual reinspection).
|
||||
*/
|
||||
private localSnapshot: Record<string, ParsedServerConfig> | null = null;
|
||||
/** Milliseconds since epoch. 0 = epoch = always expired on first check. */
|
||||
private localSnapshotExpiry = 0;
|
||||
|
||||
constructor(namespace: string, leaderOnly: boolean) {
|
||||
super(leaderOnly);
|
||||
this.cache = standardCache(`${this.PREFIX}::Servers::${namespace}`);
|
||||
}
|
||||
|
||||
private invalidateLocalSnapshot(): void {
|
||||
this.localSnapshot = null;
|
||||
this.localSnapshotExpiry = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes write operations to prevent concurrent read-modify-write races.
|
||||
* Reads (`get`, `getAll`) are not serialized — they can run concurrently.
|
||||
* Always invalidates the local snapshot in `finally` to guarantee cleanup
|
||||
* even when the write callback throws (e.g., Redis SET failure).
|
||||
*/
|
||||
private async withWriteLock<T>(fn: () => Promise<T>): Promise<T> {
|
||||
const previousLock = this.writeLock;
|
||||
|
|
@ -56,20 +79,29 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
await previousLock;
|
||||
return await fn();
|
||||
} finally {
|
||||
this.invalidateLocalSnapshot();
|
||||
resolve();
|
||||
}
|
||||
}
|
||||
|
||||
public async getAll(): Promise<Record<string, ParsedServerConfig>> {
|
||||
const startTime = Date.now();
|
||||
const result = (await this.cache.get(AGGREGATE_KEY)) as
|
||||
| Record<string, ParsedServerConfig>
|
||||
| undefined;
|
||||
const elapsed = Date.now() - startTime;
|
||||
logger.debug(
|
||||
`[ServerConfigsCacheRedisAggregateKey] getAll: fetched ${result ? Object.keys(result).length : 0} configs in ${elapsed}ms`,
|
||||
);
|
||||
return result ?? {};
|
||||
const ttl = cacheConfig.MCP_REGISTRY_CACHE_TTL;
|
||||
if (ttl > 0) {
|
||||
const now = Date.now();
|
||||
if (this.localSnapshot !== null && now < this.localSnapshotExpiry) {
|
||||
return this.localSnapshot;
|
||||
}
|
||||
}
|
||||
|
||||
const result =
|
||||
((await this.cache.get(AGGREGATE_KEY)) as Record<string, ParsedServerConfig> | undefined) ??
|
||||
{};
|
||||
|
||||
if (ttl > 0) {
|
||||
this.localSnapshot = result;
|
||||
this.localSnapshotExpiry = Date.now() + ttl;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public async get(serverName: string): Promise<ParsedServerConfig | undefined> {
|
||||
|
|
@ -80,6 +112,10 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
public async add(serverName: string, config: ParsedServerConfig): Promise<AddServerResult> {
|
||||
if (this.leaderOnly) await this.leaderCheck('add MCP servers');
|
||||
return this.withWriteLock(async () => {
|
||||
// Force fresh Redis read so the read-modify-write uses current data,
|
||||
// not a snapshot that may predate this write. Distinct from the finally-block
|
||||
// invalidation which cleans up after the write completes or throws.
|
||||
this.invalidateLocalSnapshot();
|
||||
const all = await this.getAll();
|
||||
if (all[serverName]) {
|
||||
throw new Error(
|
||||
|
|
@ -87,8 +123,8 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
);
|
||||
}
|
||||
const storedConfig = { ...config, updatedAt: Date.now() };
|
||||
all[serverName] = storedConfig;
|
||||
const success = await this.cache.set(AGGREGATE_KEY, all);
|
||||
const newAll = { ...all, [serverName]: storedConfig };
|
||||
const success = await this.cache.set(AGGREGATE_KEY, newAll);
|
||||
this.successCheck(`add App server "${serverName}"`, success);
|
||||
return { serverName, config: storedConfig };
|
||||
});
|
||||
|
|
@ -97,14 +133,15 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
public async update(serverName: string, config: ParsedServerConfig): Promise<void> {
|
||||
if (this.leaderOnly) await this.leaderCheck('update MCP servers');
|
||||
return this.withWriteLock(async () => {
|
||||
this.invalidateLocalSnapshot(); // Force fresh Redis read (see add() comment)
|
||||
const all = await this.getAll();
|
||||
if (!all[serverName]) {
|
||||
throw new Error(
|
||||
`Server "${serverName}" does not exist in cache. Use add() to create new configs.`,
|
||||
);
|
||||
}
|
||||
all[serverName] = { ...config, updatedAt: Date.now() };
|
||||
const success = await this.cache.set(AGGREGATE_KEY, all);
|
||||
const newAll = { ...all, [serverName]: { ...config, updatedAt: Date.now() } };
|
||||
const success = await this.cache.set(AGGREGATE_KEY, newAll);
|
||||
this.successCheck(`update App server "${serverName}"`, success);
|
||||
});
|
||||
}
|
||||
|
|
@ -112,12 +149,13 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
public async remove(serverName: string): Promise<void> {
|
||||
if (this.leaderOnly) await this.leaderCheck('remove MCP servers');
|
||||
return this.withWriteLock(async () => {
|
||||
this.invalidateLocalSnapshot(); // Force fresh Redis read (see add() comment)
|
||||
const all = await this.getAll();
|
||||
if (!all[serverName]) {
|
||||
throw new Error(`Failed to remove server "${serverName}" in cache.`);
|
||||
}
|
||||
delete all[serverName];
|
||||
const success = await this.cache.set(AGGREGATE_KEY, all);
|
||||
const { [serverName]: _, ...newAll } = all;
|
||||
const success = await this.cache.set(AGGREGATE_KEY, newAll);
|
||||
this.successCheck(`remove App server "${serverName}"`, success);
|
||||
});
|
||||
}
|
||||
|
|
@ -126,11 +164,16 @@ export class ServerConfigsCacheRedisAggregateKey
|
|||
* Resets the aggregate key directly instead of using SCAN-based `cache.clear()`.
|
||||
* Only one key (`__all__`) ever exists in this namespace, so a targeted delete is
|
||||
* more efficient and consistent with the PR's goal of eliminating SCAN operations.
|
||||
*
|
||||
* NOTE: Intentionally not serialized via `withWriteLock`. `reset()` is only called
|
||||
* during lifecycle transitions (test teardown, full reinitialization via
|
||||
* `MCPServersInitializer`) where no concurrent writes are in flight.
|
||||
*/
|
||||
public override async reset(): Promise<void> {
|
||||
if (this.leaderOnly) {
|
||||
await this.leaderCheck('reset App MCP servers cache');
|
||||
}
|
||||
await this.cache.delete(AGGREGATE_KEY);
|
||||
this.invalidateLocalSnapshot();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -243,4 +243,96 @@ describe('ServerConfigsCacheRedisAggregateKey Integration Tests', () => {
|
|||
expect(Object.keys(result).length).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('local snapshot behavior', () => {
|
||||
it('should collapse repeated getAll calls into a single Redis GET within TTL', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
await cache.add('server2', mockConfig2);
|
||||
|
||||
// Prime the snapshot
|
||||
await cache.getAll();
|
||||
|
||||
// Spy on the underlying Keyv cache to count Redis calls
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const cacheGetSpy = jest.spyOn((cache as any).cache, 'get');
|
||||
|
||||
await cache.getAll();
|
||||
await cache.getAll();
|
||||
await cache.getAll();
|
||||
|
||||
// Snapshot should be served; Redis should NOT have been called
|
||||
expect(cacheGetSpy).not.toHaveBeenCalled();
|
||||
cacheGetSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should invalidate snapshot after add', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
const before = await cache.getAll();
|
||||
expect(Object.keys(before).length).toBe(1);
|
||||
|
||||
await cache.add('server2', mockConfig2);
|
||||
const after = await cache.getAll();
|
||||
expect(Object.keys(after).length).toBe(2);
|
||||
});
|
||||
|
||||
it('should invalidate snapshot after update and preserve other entries', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
await cache.add('server2', mockConfig2);
|
||||
expect((await cache.getAll()).server1).toMatchObject(mockConfig1);
|
||||
|
||||
await cache.update('server1', mockConfig3);
|
||||
const after = await cache.getAll();
|
||||
expect(after.server1).toMatchObject(mockConfig3);
|
||||
expect(after.server2).toMatchObject(mockConfig2);
|
||||
});
|
||||
|
||||
it('should invalidate snapshot after remove', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
await cache.add('server2', mockConfig2);
|
||||
expect(Object.keys(await cache.getAll()).length).toBe(2);
|
||||
|
||||
await cache.remove('server1');
|
||||
const after = await cache.getAll();
|
||||
expect(Object.keys(after).length).toBe(1);
|
||||
expect(after.server1).toBeUndefined();
|
||||
expect(after.server2).toMatchObject(mockConfig2);
|
||||
});
|
||||
|
||||
it('should invalidate snapshot after reset', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
expect(Object.keys(await cache.getAll()).length).toBe(1);
|
||||
|
||||
await cache.reset();
|
||||
expect(Object.keys(await cache.getAll()).length).toBe(0);
|
||||
});
|
||||
|
||||
it('should not retroactively modify previously returned snapshot references', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
|
||||
// Prime the snapshot
|
||||
const snapshot = await cache.getAll();
|
||||
expect(Object.keys(snapshot).length).toBe(1);
|
||||
|
||||
// Add a second server — the original snapshot reference should be unmodified
|
||||
await cache.add('server2', mockConfig2);
|
||||
expect(Object.keys(snapshot).length).toBe(1);
|
||||
expect(snapshot.server2).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should hit Redis again after snapshot TTL expires', async () => {
|
||||
await cache.add('server1', mockConfig1);
|
||||
await cache.getAll(); // prime snapshot
|
||||
|
||||
// Force-expire the snapshot without sleeping
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(cache as any).localSnapshotExpiry = Date.now() - 1;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const cacheGetSpy = jest.spyOn((cache as any).cache, 'get');
|
||||
const result = await cache.getAll();
|
||||
expect(cacheGetSpy).toHaveBeenCalledTimes(1);
|
||||
expect(Object.keys(result).length).toBe(1);
|
||||
cacheGetSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue