mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-02-05 09:11:51 +01:00
✈️ refactor: Single-Flight Deduplication for MCP Server Configs and Optimize Redis Batch Fetching (#11628)
* fix: Implement single-flight deduplication for getAllServerConfigs and optimize Redis getAll method - Added a pending promises map in MCPServersRegistry to handle concurrent calls to getAllServerConfigs, ensuring that multiple requests for the same userId are deduplicated. - Introduced a new fetchAllServerConfigs method to streamline the fetching process and improve performance. - Enhanced the getAll method in ServerConfigsCacheRedis to utilize MGET for batch fetching, significantly reducing Redis roundtrips and improving efficiency. - Added comprehensive tests for deduplication and performance optimizations, ensuring consistent results across concurrent calls and validating the new implementation. * refactor: Enhance logging in ServerConfigsCacheRedis for getAll method - Added debug logging to track the execution time and key retrieval in the getAll method of ServerConfigsCacheRedis. - Improved import organization by consolidating related imports for better clarity and maintainability. * test: Update MCPServersRegistry and ServerConfigsCacheRedis tests for call count assertions - Modified MCPServersRegistry integration tests to assert specific call counts for cache retrieval, ensuring accurate tracking of Redis interactions. - Refactored ServerConfigsCacheRedis integration tests to rename the test suite for clarity and improved focus on parallel fetching optimizations. - Enhanced the getAll method in ServerConfigsCacheRedis to utilize batching for improved performance during key retrieval. * chore: Simplify key extraction in ServerConfigsCacheRedis - Streamlined the key extraction logic in the getAll method of ServerConfigsCacheRedis by consolidating the mapping function into a single line, enhancing code readability and maintainability.
This commit is contained in:
parent
db84ec681a
commit
5dc5799fc0
4 changed files with 310 additions and 40 deletions
|
|
@ -26,6 +26,10 @@ export class MCPServersRegistry {
|
|||
private readonly allowedDomains?: string[] | null;
|
||||
private readonly readThroughCache: Keyv<t.ParsedServerConfig>;
|
||||
private readonly readThroughCacheAll: Keyv<Record<string, t.ParsedServerConfig>>;
|
||||
private readonly pendingGetAllPromises = new Map<
|
||||
string,
|
||||
Promise<Record<string, t.ParsedServerConfig>>
|
||||
>();
|
||||
|
||||
constructor(mongoose: typeof import('mongoose'), allowedDomains?: string[] | null) {
|
||||
this.dbConfigsRepo = new ServerConfigsDB(mongoose);
|
||||
|
|
@ -99,11 +103,29 @@ export class MCPServersRegistry {
|
|||
public async getAllServerConfigs(userId?: string): Promise<Record<string, t.ParsedServerConfig>> {
|
||||
const cacheKey = userId ?? '__no_user__';
|
||||
|
||||
// Check if key exists in read-through cache
|
||||
if (await this.readThroughCacheAll.has(cacheKey)) {
|
||||
return (await this.readThroughCacheAll.get(cacheKey)) ?? {};
|
||||
}
|
||||
|
||||
const pending = this.pendingGetAllPromises.get(cacheKey);
|
||||
if (pending) {
|
||||
return pending;
|
||||
}
|
||||
|
||||
const fetchPromise = this.fetchAllServerConfigs(cacheKey, userId);
|
||||
this.pendingGetAllPromises.set(cacheKey, fetchPromise);
|
||||
|
||||
try {
|
||||
return await fetchPromise;
|
||||
} finally {
|
||||
this.pendingGetAllPromises.delete(cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchAllServerConfigs(
|
||||
cacheKey: string,
|
||||
userId?: string,
|
||||
): Promise<Record<string, t.ParsedServerConfig>> {
|
||||
const result = {
|
||||
...(await this.cacheConfigsRepo.getAll()),
|
||||
...(await this.dbConfigsRepo.getAll(userId)),
|
||||
|
|
|
|||
|
|
@ -236,4 +236,106 @@ describe('MCPServersRegistry Redis Integration Tests', () => {
|
|||
expect(Object.keys(configsAfter)).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('single-flight deduplication', () => {
|
||||
it('should deduplicate concurrent getAllServerConfigs calls', async () => {
|
||||
await registry.addServer('server1', testRawConfig, 'CACHE');
|
||||
await registry.addServer('server2', testRawConfig, 'CACHE');
|
||||
await registry.addServer('server3', testRawConfig, 'CACHE');
|
||||
|
||||
await registry['readThroughCacheAll'].clear();
|
||||
|
||||
const cacheRepoGetAllSpy = jest.spyOn(registry['cacheConfigsRepo'], 'getAll');
|
||||
|
||||
const concurrentCalls = 10;
|
||||
const promises = Array.from({ length: concurrentCalls }, () =>
|
||||
registry.getAllServerConfigs(),
|
||||
);
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
|
||||
expect(cacheRepoGetAllSpy.mock.calls.length).toBe(1);
|
||||
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).length).toBe(3);
|
||||
expect(result).toHaveProperty('server1');
|
||||
expect(result).toHaveProperty('server2');
|
||||
expect(result).toHaveProperty('server3');
|
||||
}
|
||||
});
|
||||
|
||||
it('should handle different userIds independently', async () => {
|
||||
await registry.addServer('shared_server', testRawConfig, 'CACHE');
|
||||
|
||||
await registry['readThroughCacheAll'].clear();
|
||||
|
||||
const cacheRepoGetAllSpy = jest.spyOn(registry['cacheConfigsRepo'], 'getAll');
|
||||
|
||||
const [result1, result2, result3] = await Promise.all([
|
||||
registry.getAllServerConfigs('user1'),
|
||||
registry.getAllServerConfigs('user2'),
|
||||
registry.getAllServerConfigs('user1'),
|
||||
]);
|
||||
|
||||
expect(cacheRepoGetAllSpy.mock.calls.length).toBe(2);
|
||||
|
||||
expect(Object.keys(result1)).toContain('shared_server');
|
||||
expect(Object.keys(result2)).toContain('shared_server');
|
||||
expect(Object.keys(result3)).toContain('shared_server');
|
||||
});
|
||||
|
||||
it('should complete concurrent requests without timeout', async () => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await registry.addServer(`stress_server_${i}`, testRawConfig, 'CACHE');
|
||||
}
|
||||
|
||||
await registry['readThroughCacheAll'].clear();
|
||||
|
||||
const concurrentCalls = 50;
|
||||
const startTime = Date.now();
|
||||
|
||||
const promises = Array.from({ length: concurrentCalls }, () =>
|
||||
registry.getAllServerConfigs(),
|
||||
);
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
expect(elapsed).toBeLessThan(10000);
|
||||
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).length).toBe(10);
|
||||
}
|
||||
});
|
||||
|
||||
it('should return consistent results across all concurrent callers', async () => {
|
||||
await registry.addServer('consistency_server_a', testRawConfig, 'CACHE');
|
||||
await registry.addServer(
|
||||
'consistency_server_b',
|
||||
{
|
||||
...testRawConfig,
|
||||
command: 'python',
|
||||
},
|
||||
'CACHE',
|
||||
);
|
||||
|
||||
await registry['readThroughCacheAll'].clear();
|
||||
|
||||
const results = await Promise.all([
|
||||
registry.getAllServerConfigs(),
|
||||
registry.getAllServerConfigs(),
|
||||
registry.getAllServerConfigs(),
|
||||
registry.getAllServerConfigs(),
|
||||
registry.getAllServerConfigs(),
|
||||
]);
|
||||
|
||||
const firstResult = results[0];
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).sort()).toEqual(Object.keys(firstResult).sort());
|
||||
for (const key of Object.keys(firstResult)) {
|
||||
expect(result[key]).toMatchObject(firstResult[key]);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
import type Keyv from 'keyv';
|
||||
import { fromPairs } from 'lodash';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import type { IServerConfigsRepositoryInterface } from '~/mcp/registry/ServerConfigsRepositoryInterface';
|
||||
import type { ParsedServerConfig, AddServerResult } from '~/mcp/types';
|
||||
import { standardCache, keyvRedisClient } from '~/cache';
|
||||
import { ParsedServerConfig, AddServerResult } from '~/mcp/types';
|
||||
import { BaseRegistryCache } from './BaseRegistryCache';
|
||||
import { IServerConfigsRepositoryInterface } from '../ServerConfigsRepositoryInterface';
|
||||
|
||||
/**
|
||||
* Redis-backed implementation of MCP server configurations cache for distributed deployments.
|
||||
|
|
@ -12,6 +13,8 @@ import { IServerConfigsRepositoryInterface } from '../ServerConfigsRepositoryInt
|
|||
* Supports optional leader-only write operations to prevent race conditions during initialization.
|
||||
* Data persists across server restarts and is accessible from any instance in the cluster.
|
||||
*/
|
||||
const BATCH_SIZE = 100;
|
||||
|
||||
export class ServerConfigsCacheRedis
|
||||
extends BaseRegistryCache
|
||||
implements IServerConfigsRepositoryInterface
|
||||
|
|
@ -60,27 +63,50 @@ export class ServerConfigsCacheRedis
|
|||
}
|
||||
|
||||
public async getAll(): Promise<Record<string, ParsedServerConfig>> {
|
||||
// Use Redis SCAN iterator directly (non-blocking, production-ready)
|
||||
// Note: Keyv uses a single colon ':' between namespace and key, even if GLOBAL_PREFIX_SEPARATOR is '::'
|
||||
const pattern = `*${this.cache.namespace}:*`;
|
||||
const entries: Array<[string, ParsedServerConfig]> = [];
|
||||
|
||||
// Use scanIterator from Redis client
|
||||
if (keyvRedisClient && 'scanIterator' in keyvRedisClient) {
|
||||
for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) {
|
||||
// Extract the actual key name (last part after final colon)
|
||||
// Full key format: "prefix::namespace:keyName"
|
||||
const lastColonIndex = key.lastIndexOf(':');
|
||||
const keyName = key.substring(lastColonIndex + 1);
|
||||
const config = (await this.cache.get(keyName)) as ParsedServerConfig | undefined;
|
||||
if (config) {
|
||||
entries.push([keyName, config]);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
|
||||
throw new Error('Redis client with scanIterator not available.');
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
const pattern = `*${this.cache.namespace}:*`;
|
||||
|
||||
const keys: string[] = [];
|
||||
for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) {
|
||||
keys.push(key);
|
||||
}
|
||||
|
||||
if (keys.length === 0) {
|
||||
logger.debug(`[ServerConfigsCacheRedis] getAll(${this.namespace}): no keys found`);
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Extract keyName from full Redis key format: "prefix::namespace:keyName" */
|
||||
const keyNames = keys.map((key) => key.substring(key.lastIndexOf(':') + 1));
|
||||
|
||||
const entries: Array<[string, ParsedServerConfig]> = [];
|
||||
|
||||
for (let i = 0; i < keyNames.length; i += BATCH_SIZE) {
|
||||
const batchEnd = Math.min(i + BATCH_SIZE, keyNames.length);
|
||||
const promises: Promise<ParsedServerConfig | undefined>[] = [];
|
||||
|
||||
for (let j = i; j < batchEnd; j++) {
|
||||
promises.push(this.cache.get(keyNames[j]));
|
||||
}
|
||||
|
||||
const configs = await Promise.all(promises);
|
||||
|
||||
for (let j = 0; j < configs.length; j++) {
|
||||
if (configs[j]) {
|
||||
entries.push([keyNames[i + j], configs[j]!]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const elapsed = Date.now() - startTime;
|
||||
logger.debug(
|
||||
`[ServerConfigsCacheRedis] getAll(${this.namespace}): fetched ${entries.length} configs in ${elapsed}ms`,
|
||||
);
|
||||
|
||||
return fromPairs(entries);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,25 +7,25 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
|
||||
let cache: InstanceType<typeof import('../ServerConfigsCacheRedis').ServerConfigsCacheRedis>;
|
||||
|
||||
// Test data
|
||||
const mockConfig1: ParsedServerConfig = {
|
||||
const mockConfig1 = {
|
||||
type: 'stdio',
|
||||
command: 'node',
|
||||
args: ['server1.js'],
|
||||
env: { TEST: 'value1' },
|
||||
};
|
||||
} as ParsedServerConfig;
|
||||
|
||||
const mockConfig2: ParsedServerConfig = {
|
||||
const mockConfig2 = {
|
||||
type: 'stdio',
|
||||
command: 'python',
|
||||
args: ['server2.py'],
|
||||
env: { TEST: 'value2' },
|
||||
};
|
||||
} as ParsedServerConfig;
|
||||
|
||||
const mockConfig3: ParsedServerConfig = {
|
||||
command: 'node',
|
||||
args: ['server3.js'],
|
||||
const mockConfig3 = {
|
||||
type: 'sse',
|
||||
url: 'http://localhost:3000',
|
||||
requiresOAuth: true,
|
||||
};
|
||||
} as ParsedServerConfig;
|
||||
|
||||
beforeAll(async () => {
|
||||
// Set up environment variables for Redis (only if not already set)
|
||||
|
|
@ -52,9 +52,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
});
|
||||
|
||||
beforeEach(() => {
|
||||
// Create a fresh instance for each test with leaderOnly=true
|
||||
jest.resetModules();
|
||||
cache = new ServerConfigsCacheRedis('test-user', 'Shared', false);
|
||||
cache = new ServerConfigsCacheRedis('test-user', false);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
|
|
@ -114,8 +113,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
});
|
||||
|
||||
it('should isolate caches by owner namespace', async () => {
|
||||
const userCache = new ServerConfigsCacheRedis('user1', 'Private', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false);
|
||||
const userCache = new ServerConfigsCacheRedis('user1-private', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global-shared', false);
|
||||
|
||||
await userCache.add('server1', mockConfig1);
|
||||
await globalCache.add('server1', mockConfig2);
|
||||
|
|
@ -161,8 +160,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
});
|
||||
|
||||
it('should only return configs for the specific owner', async () => {
|
||||
const userCache = new ServerConfigsCacheRedis('user1', 'Private', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global', 'Private', false);
|
||||
const userCache = new ServerConfigsCacheRedis('user1-owner', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global-owner', false);
|
||||
|
||||
await userCache.add('server1', mockConfig1);
|
||||
await userCache.add('server2', mockConfig2);
|
||||
|
|
@ -206,8 +205,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
});
|
||||
|
||||
it('should only update in the specific owner namespace', async () => {
|
||||
const userCache = new ServerConfigsCacheRedis('user1', 'Private', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false);
|
||||
const userCache = new ServerConfigsCacheRedis('user1-update', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global-update', false);
|
||||
|
||||
await userCache.add('server1', mockConfig1);
|
||||
await globalCache.add('server1', mockConfig2);
|
||||
|
|
@ -258,8 +257,8 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
});
|
||||
|
||||
it('should only remove from the specific owner namespace', async () => {
|
||||
const userCache = new ServerConfigsCacheRedis('user1', 'Private', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global', 'Shared', false);
|
||||
const userCache = new ServerConfigsCacheRedis('user1-remove', false);
|
||||
const globalCache = new ServerConfigsCacheRedis('global-remove', false);
|
||||
|
||||
await userCache.add('server1', mockConfig1);
|
||||
await globalCache.add('server1', mockConfig2);
|
||||
|
|
@ -270,4 +269,125 @@ describe('ServerConfigsCacheRedis Integration Tests', () => {
|
|||
expect(await globalCache.get('server1')).toMatchObject(mockConfig2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getAll parallel fetching', () => {
|
||||
it('should handle many configs efficiently with parallel fetching', async () => {
|
||||
const testCache = new ServerConfigsCacheRedis('parallel-test', false);
|
||||
const configCount = 20;
|
||||
|
||||
for (let i = 0; i < configCount; i++) {
|
||||
await testCache.add(`server-${i}`, {
|
||||
type: 'stdio',
|
||||
command: `cmd-${i}`,
|
||||
args: [`arg-${i}`],
|
||||
} as ParsedServerConfig);
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
const result = await testCache.getAll();
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
expect(Object.keys(result).length).toBe(configCount);
|
||||
for (let i = 0; i < configCount; i++) {
|
||||
expect(result[`server-${i}`]).toBeDefined();
|
||||
const config = result[`server-${i}`] as { command?: string };
|
||||
expect(config.command).toBe(`cmd-${i}`);
|
||||
}
|
||||
|
||||
expect(elapsed).toBeLessThan(5000);
|
||||
});
|
||||
|
||||
it('should handle concurrent getAll calls without timeout', async () => {
|
||||
const testCache = new ServerConfigsCacheRedis('concurrent-test', false);
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await testCache.add(`server-${i}`, {
|
||||
type: 'stdio',
|
||||
command: `cmd-${i}`,
|
||||
args: [`arg-${i}`],
|
||||
} as ParsedServerConfig);
|
||||
}
|
||||
|
||||
const concurrentCalls = 50;
|
||||
const startTime = Date.now();
|
||||
const promises = Array.from({ length: concurrentCalls }, () => testCache.getAll());
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).length).toBe(10);
|
||||
}
|
||||
|
||||
expect(elapsed).toBeLessThan(10000);
|
||||
});
|
||||
|
||||
it('should return consistent results across concurrent calls', async () => {
|
||||
const testCache = new ServerConfigsCacheRedis('consistency-test', false);
|
||||
|
||||
await testCache.add('server-a', mockConfig1);
|
||||
await testCache.add('server-b', mockConfig2);
|
||||
await testCache.add('server-c', mockConfig3);
|
||||
|
||||
const results = await Promise.all([
|
||||
testCache.getAll(),
|
||||
testCache.getAll(),
|
||||
testCache.getAll(),
|
||||
testCache.getAll(),
|
||||
testCache.getAll(),
|
||||
]);
|
||||
|
||||
const firstResult = results[0];
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).sort()).toEqual(Object.keys(firstResult).sort());
|
||||
expect(result['server-a']).toMatchObject(mockConfig1);
|
||||
expect(result['server-b']).toMatchObject(mockConfig2);
|
||||
expect(result['server-c']).toMatchObject(mockConfig3);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Performance regression test for N+1 Redis fix.
|
||||
*
|
||||
* Before fix: getAll() used sequential GET calls inside an async loop:
|
||||
* for await (key of scan) { await cache.get(key); } // N sequential calls
|
||||
*
|
||||
* With 30 configs and 100 concurrent requests, this would cause:
|
||||
* - 100 × 30 = 3000 sequential Redis roundtrips
|
||||
* - Under load, requests would queue and timeout at 60s
|
||||
*
|
||||
* After fix: getAll() uses Promise.all for parallel fetching:
|
||||
* Promise.all(keys.map(k => cache.get(k))); // N parallel calls
|
||||
*
|
||||
* This test validates the fix by ensuring 100 concurrent requests
|
||||
* complete in under 5 seconds - impossible with the old N+1 pattern.
|
||||
*/
|
||||
it('should complete 100 concurrent requests in under 5s (regression test for N+1 fix)', async () => {
|
||||
const testCache = new ServerConfigsCacheRedis('perf-regression-test', false);
|
||||
const configCount = 30;
|
||||
|
||||
for (let i = 0; i < configCount; i++) {
|
||||
await testCache.add(`server-${i}`, {
|
||||
type: 'stdio',
|
||||
command: `cmd-${i}`,
|
||||
args: [`arg-${i}`],
|
||||
} as ParsedServerConfig);
|
||||
}
|
||||
|
||||
const concurrentRequests = 100;
|
||||
const maxAllowedMs = 5000;
|
||||
|
||||
const startTime = Date.now();
|
||||
const promises = Array.from({ length: concurrentRequests }, () => testCache.getAll());
|
||||
const results = await Promise.all(promises);
|
||||
const elapsed = Date.now() - startTime;
|
||||
|
||||
expect(results.length).toBe(concurrentRequests);
|
||||
for (const result of results) {
|
||||
expect(Object.keys(result).length).toBe(configCount);
|
||||
}
|
||||
|
||||
expect(elapsed).toBeLessThan(maxAllowedMs);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue