mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-24 12:20:14 +01:00
📡 refactor: MCP Runtime Config Sync with Redis Distributed Locking (#10352)
* 🔄 Refactoring: MCP Runtime Configuration Reload
- PrivateServerConfigs own cache classes (inMemory and Redis).
- Connections staleness detection by comparing (connection.createdAt and config.LastUpdatedAt)
- ConnectionsRepo access Registry instead of in memory config dict and renew stale connections
- MCPManager: adjusted init of ConnectionsRepo (app level)
- UserConnectionManager: renew stale connections
- skipped test, to test "should only clear keys in its own namespace"
- MCPPrivateServerLoader: new component to manage logic of loading / editing private servers on runtime
- PrivateServersLoadStatusCache to track private server cache status
- New unit and integration tests.
Misc:
- add es lint rule to enforce line between class methods
* Fix cluster mode batch update and delete workarround. Fixed unit tests for cluster mode.
* Fix Keyv redis clear cache namespace awareness issue + Integration tests fixes
* chore: address copilot comments
* Fixing rebase issue: removed the mcp config fallback in single getServerConfig method:
- to not to interfere with the logic of the right Tier (APP/USER/Private)
- If userId is null, the getServerConfig should not return configs that are a SharedUser tier and not APP tier
* chore: add dev-staging branch to workflow triggers for backend, cache integration, and ESLint checks
---------
Co-authored-by: Atef Bellaaj <slalom.bellaaj@external.daimlertruck.com>
This commit is contained in:
parent
52e6796635
commit
ac68e629e6
49 changed files with 5244 additions and 257 deletions
|
|
@ -138,6 +138,39 @@ describe('standardCache', () => {
|
|||
await cache2.clear();
|
||||
});
|
||||
|
||||
test('clear() should only clear keys in its own namespace', async () => {
|
||||
const cacheFactory = await import('../../cacheFactory');
|
||||
|
||||
const cache1 = cacheFactory.standardCache('namespace-clear-test-1');
|
||||
const cache2 = cacheFactory.standardCache('namespace-clear-test-2');
|
||||
|
||||
// Add data to both caches
|
||||
await cache1.set('key1', 'value1-cache1');
|
||||
await cache1.set('key2', 'value2-cache1');
|
||||
await cache2.set('key1', 'value1-cache2');
|
||||
await cache2.set('key2', 'value2-cache2');
|
||||
|
||||
// Verify both caches have their data
|
||||
expect(await cache1.get('key1')).toBe('value1-cache1');
|
||||
expect(await cache1.get('key2')).toBe('value2-cache1');
|
||||
expect(await cache2.get('key1')).toBe('value1-cache2');
|
||||
expect(await cache2.get('key2')).toBe('value2-cache2');
|
||||
|
||||
// Clear cache1 only
|
||||
await cache1.clear();
|
||||
|
||||
// cache1 should be empty
|
||||
expect(await cache1.get('key1')).toBeUndefined();
|
||||
expect(await cache1.get('key2')).toBeUndefined();
|
||||
|
||||
// cache2 should still have its data
|
||||
expect(await cache2.get('key1')).toBe('value1-cache2');
|
||||
expect(await cache2.get('key2')).toBe('value2-cache2');
|
||||
|
||||
// Cleanup
|
||||
await cache2.clear();
|
||||
});
|
||||
|
||||
test('should respect FORCED_IN_MEMORY_CACHE_NAMESPACES', async () => {
|
||||
process.env.FORCED_IN_MEMORY_CACHE_NAMESPACES = 'ROLES'; // Use a valid cache key
|
||||
|
||||
|
|
|
|||
211
packages/api/src/cache/__tests__/redisUtils.cache_integration.spec.ts
vendored
Normal file
211
packages/api/src/cache/__tests__/redisUtils.cache_integration.spec.ts
vendored
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
import { batchDeleteKeys, scanKeys } from '../redisUtils';
|
||||
|
||||
describe('redisUtils Integration Tests', () => {
|
||||
let keyvRedisClient: Awaited<typeof import('../redisClients')>['keyvRedisClient'];
|
||||
const testPrefix = 'RedisUtils-Integration-Test';
|
||||
|
||||
beforeAll(async () => {
|
||||
// Set up environment variables for Redis (only if not already set)
|
||||
process.env.USE_REDIS = process.env.USE_REDIS ?? 'true';
|
||||
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
|
||||
process.env.REDIS_KEY_PREFIX = process.env.REDIS_KEY_PREFIX ?? testPrefix;
|
||||
process.env.REDIS_DELETE_CHUNK_SIZE = '100';
|
||||
|
||||
// Clear module cache to ensure fresh initialization with new env vars
|
||||
jest.resetModules();
|
||||
|
||||
// Import modules after setting env vars and clearing cache
|
||||
const redisClients = await import('../redisClients');
|
||||
keyvRedisClient = redisClients.keyvRedisClient;
|
||||
|
||||
// Ensure Redis is connected
|
||||
if (!keyvRedisClient) throw new Error('Redis client is not initialized');
|
||||
|
||||
// Wait for connection and topology discovery to complete
|
||||
await redisClients.keyvRedisClientReady;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
// Clean up: clear all test keys from Redis
|
||||
if (keyvRedisClient && 'scanIterator' in keyvRedisClient) {
|
||||
const pattern = `*${testPrefix}*`;
|
||||
const keysToDelete: string[] = [];
|
||||
|
||||
// Collect all keys first
|
||||
for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) {
|
||||
keysToDelete.push(key);
|
||||
}
|
||||
|
||||
// Delete in parallel for cluster mode efficiency
|
||||
if (keysToDelete.length > 0) {
|
||||
await Promise.all(keysToDelete.map((key) => keyvRedisClient!.del(key)));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
// Close Redis connection
|
||||
if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect();
|
||||
});
|
||||
|
||||
describe('batchDeleteKeys', () => {
|
||||
test('should delete multiple keys successfully', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
// Setup: Create test keys
|
||||
const keys = [
|
||||
`${testPrefix}::key1`,
|
||||
`${testPrefix}::key2`,
|
||||
`${testPrefix}::key3`,
|
||||
`${testPrefix}::key4`,
|
||||
`${testPrefix}::key5`,
|
||||
];
|
||||
|
||||
for (const key of keys) {
|
||||
await keyvRedisClient.set(key, 'test-value');
|
||||
}
|
||||
|
||||
// Verify keys exist
|
||||
for (const key of keys) {
|
||||
const exists = await keyvRedisClient.exists(key);
|
||||
expect(exists).toBe(1);
|
||||
}
|
||||
|
||||
// Execute: Delete keys
|
||||
const deletedCount = await batchDeleteKeys(keyvRedisClient, keys);
|
||||
|
||||
// Verify: All keys deleted
|
||||
expect(deletedCount).toBe(5);
|
||||
|
||||
for (const key of keys) {
|
||||
const exists = await keyvRedisClient.exists(key);
|
||||
expect(exists).toBe(0);
|
||||
}
|
||||
});
|
||||
|
||||
test('should handle large batch deletions (>1000 keys)', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
// Create 1500 test keys
|
||||
const keys: string[] = [];
|
||||
for (let i = 0; i < 1500; i++) {
|
||||
keys.push(`${testPrefix}::large-batch::${i}`);
|
||||
}
|
||||
|
||||
// Set all keys in batches to avoid overwhelming cluster
|
||||
const setBatchSize = 100;
|
||||
for (let i = 0; i < keys.length; i += setBatchSize) {
|
||||
const batch = keys.slice(i, i + setBatchSize);
|
||||
await Promise.all(batch.map((key) => keyvRedisClient!.set(key, 'value')));
|
||||
}
|
||||
|
||||
// Delete in batches
|
||||
const deletedCount = await batchDeleteKeys(keyvRedisClient, keys, 500);
|
||||
|
||||
// Verify all deleted
|
||||
expect(deletedCount).toBe(1500);
|
||||
|
||||
const existsResults = await Promise.all(keys.map((key) => keyvRedisClient!.exists(key)));
|
||||
const totalExists = existsResults.reduce((sum, exists) => sum + exists, 0);
|
||||
expect(totalExists).toBe(0);
|
||||
});
|
||||
|
||||
test('should handle mixed existing and non-existing keys', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
const existingKeys = [`${testPrefix}::exists1`, `${testPrefix}::exists2`];
|
||||
const nonExistingKeys = [`${testPrefix}::noexist1`, `${testPrefix}::noexist2`];
|
||||
|
||||
// Create only some keys
|
||||
for (const key of existingKeys) {
|
||||
await keyvRedisClient.set(key, 'value');
|
||||
}
|
||||
|
||||
// Try to delete both existing and non-existing
|
||||
const allKeys = [...existingKeys, ...nonExistingKeys];
|
||||
const deletedCount = await batchDeleteKeys(keyvRedisClient, allKeys);
|
||||
|
||||
// Should only delete the existing ones
|
||||
expect(deletedCount).toBe(2);
|
||||
});
|
||||
|
||||
test('should work with custom chunk sizes', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
const keys = Array.from({ length: 75 }, (_, i) => `${testPrefix}::chunk::${i}`);
|
||||
|
||||
// Set all keys
|
||||
await Promise.all(keys.map((key) => keyvRedisClient!.set(key, 'value')));
|
||||
|
||||
// Delete with small chunk size (25)
|
||||
const deletedCount = await batchDeleteKeys(keyvRedisClient, keys, 25);
|
||||
|
||||
expect(deletedCount).toBe(75);
|
||||
});
|
||||
|
||||
test('should return 0 for empty keys array', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
const deletedCount = await batchDeleteKeys(keyvRedisClient, []);
|
||||
expect(deletedCount).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scanKeys', () => {
|
||||
test('should scan and find all matching keys', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
// Create test keys with a specific pattern
|
||||
const userKeys = [
|
||||
`${testPrefix}::user::1`,
|
||||
`${testPrefix}::user::2`,
|
||||
`${testPrefix}::user::3`,
|
||||
];
|
||||
const sessionKeys = [`${testPrefix}::session::1`, `${testPrefix}::session::2`];
|
||||
|
||||
// Set all keys
|
||||
await Promise.all(
|
||||
[...userKeys, ...sessionKeys].map((key) => keyvRedisClient!.set(key, 'value')),
|
||||
);
|
||||
|
||||
// Scan for user keys only
|
||||
const foundKeys = await scanKeys(keyvRedisClient, `${testPrefix}::user::*`);
|
||||
|
||||
// Should find only user keys
|
||||
expect(foundKeys).toHaveLength(3);
|
||||
expect(foundKeys.sort()).toEqual(userKeys.sort());
|
||||
});
|
||||
|
||||
test('should scan large number of keys', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
// Create 2000 test keys
|
||||
const keys: string[] = [];
|
||||
for (let i = 0; i < 2000; i++) {
|
||||
keys.push(`${testPrefix}::large-scan::${i}`);
|
||||
}
|
||||
|
||||
// Set all keys in batches to avoid overwhelming cluster
|
||||
const setBatchSize = 100;
|
||||
for (let i = 0; i < keys.length; i += setBatchSize) {
|
||||
const batch = keys.slice(i, i + setBatchSize);
|
||||
await Promise.all(batch.map((key) => keyvRedisClient!.set(key, 'value')));
|
||||
}
|
||||
|
||||
// Scan with custom count
|
||||
const foundKeys = await scanKeys(keyvRedisClient, `${testPrefix}::large-scan::*`, 500);
|
||||
|
||||
// Should find all keys
|
||||
expect(foundKeys).toHaveLength(2000);
|
||||
expect(foundKeys.sort()).toEqual(keys.sort());
|
||||
});
|
||||
|
||||
test('should return empty array when no keys match pattern', async () => {
|
||||
if (!keyvRedisClient) throw new Error('Redis client not available');
|
||||
|
||||
const foundKeys = await scanKeys(keyvRedisClient, `${testPrefix}::nonexistent::*`);
|
||||
|
||||
expect(foundKeys).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
||||
27
packages/api/src/cache/cacheConfig.ts
vendored
27
packages/api/src/cache/cacheConfig.ts
vendored
|
|
@ -85,6 +85,33 @@ const cacheConfig = {
|
|||
DEBUG_MEMORY_CACHE: isEnabled(process.env.DEBUG_MEMORY_CACHE),
|
||||
|
||||
BAN_DURATION: math(process.env.BAN_DURATION, 7200000), // 2 hours
|
||||
|
||||
/**
|
||||
* Number of keys to delete in each batch during Redis DEL operations.
|
||||
* In cluster mode, keys are deleted individually in parallel chunks to avoid CROSSSLOT errors.
|
||||
* In single-node mode, keys are deleted in batches using DEL with arrays.
|
||||
* Lower values reduce memory usage but increase number of Redis calls.
|
||||
* @default 1000
|
||||
*/
|
||||
REDIS_DELETE_CHUNK_SIZE: math(process.env.REDIS_DELETE_CHUNK_SIZE, 1000),
|
||||
|
||||
/**
|
||||
* Number of keys to update in each batch during Redis SET operations.
|
||||
* In cluster mode, keys are updated individually in parallel chunks to avoid CROSSSLOT errors.
|
||||
* In single-node mode, keys are updated in batches using transactions (multi/exec).
|
||||
* Lower values reduce memory usage but increase number of Redis calls.
|
||||
* @default 1000
|
||||
*/
|
||||
REDIS_UPDATE_CHUNK_SIZE: math(process.env.REDIS_UPDATE_CHUNK_SIZE, 1000),
|
||||
|
||||
/**
|
||||
* COUNT hint for Redis SCAN operations when scanning keys by pattern.
|
||||
* This is a hint to Redis about how many keys to scan in each iteration.
|
||||
* Higher values can reduce round trips but increase memory usage and latency per call.
|
||||
* Note: Redis may return more or fewer keys than this count depending on internal heuristics.
|
||||
* @default 1000
|
||||
*/
|
||||
REDIS_SCAN_COUNT: math(process.env.REDIS_SCAN_COUNT, 1000),
|
||||
};
|
||||
|
||||
export { cacheConfig };
|
||||
|
|
|
|||
27
packages/api/src/cache/cacheFactory.ts
vendored
27
packages/api/src/cache/cacheFactory.ts
vendored
|
|
@ -17,6 +17,7 @@ import type { SendCommandFn } from 'rate-limit-redis';
|
|||
import { keyvRedisClient, ioredisClient } from './redisClients';
|
||||
import { cacheConfig } from './cacheConfig';
|
||||
import { violationFile } from './keyvFiles';
|
||||
import { batchDeleteKeys, scanKeys } from './redisUtils';
|
||||
|
||||
/**
|
||||
* Creates a cache instance using Redis or a fallback store. Suitable for general caching needs.
|
||||
|
|
@ -37,6 +38,32 @@ export const standardCache = (namespace: string, ttl?: number, fallbackStore?: o
|
|||
logger.error(`Cache error in namespace ${namespace}:`, err);
|
||||
});
|
||||
|
||||
// Override clear() to handle namespace-aware deletion
|
||||
// The default Keyv clear() doesn't respect namespace due to the workaround above
|
||||
// Workaround for issue #10487 https://github.com/danny-avila/LibreChat/issues/10487
|
||||
cache.clear = async () => {
|
||||
// Type-safe check for Redis client with scanIterator support
|
||||
if (!keyvRedisClient || !('scanIterator' in keyvRedisClient)) {
|
||||
logger.warn(`Cannot clear namespace ${namespace}: Redis scanIterator not available`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Build pattern: globalPrefix::namespace:* or namespace:*
|
||||
const pattern = cacheConfig.REDIS_KEY_PREFIX
|
||||
? `${cacheConfig.REDIS_KEY_PREFIX}${cacheConfig.GLOBAL_PREFIX_SEPARATOR}${namespace}:*`
|
||||
: `${namespace}:*`;
|
||||
|
||||
// Use utility functions for efficient scan and parallel deletion
|
||||
const keysToDelete = await scanKeys(keyvRedisClient, pattern);
|
||||
|
||||
if (keysToDelete.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await batchDeleteKeys(keyvRedisClient, keysToDelete);
|
||||
logger.debug(`Cleared ${keysToDelete.length} keys from namespace ${namespace}`);
|
||||
};
|
||||
|
||||
return cache;
|
||||
} catch (err) {
|
||||
logger.error(`Failed to create Redis cache for namespace ${namespace}:`, err);
|
||||
|
|
|
|||
1
packages/api/src/cache/index.ts
vendored
1
packages/api/src/cache/index.ts
vendored
|
|
@ -3,3 +3,4 @@ export * from './redisClients';
|
|||
export * from './keyvFiles';
|
||||
export { default as keyvMongo } from './keyvMongo';
|
||||
export * from './cacheFactory';
|
||||
export * from './redisUtils';
|
||||
|
|
|
|||
129
packages/api/src/cache/redisUtils.ts
vendored
Normal file
129
packages/api/src/cache/redisUtils.ts
vendored
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
import type { RedisClientType, RedisClusterType } from '@redis/client';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import { cacheConfig } from './cacheConfig';
|
||||
|
||||
/**
|
||||
* Efficiently deletes multiple Redis keys with support for both cluster and single-node modes.
|
||||
*
|
||||
* - Cluster mode: Deletes keys in parallel chunks to avoid CROSSSLOT errors
|
||||
* - Single-node mode: Uses batch DEL commands for efficiency
|
||||
*
|
||||
* @param client - Redis client (node or cluster)
|
||||
* @param keys - Array of keys to delete
|
||||
* @param chunkSize - Optional chunk size (defaults to REDIS_DELETE_CHUNK_SIZE config)
|
||||
* @returns Number of keys deleted
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const deletedCount = await batchDeleteKeys(keyvRedisClient, ['key1', 'key2', 'key3']);
|
||||
* console.log(`Deleted ${deletedCount} keys`);
|
||||
* ```
|
||||
*/
|
||||
export async function batchDeleteKeys(
|
||||
client: RedisClientType | RedisClusterType,
|
||||
keys: string[],
|
||||
chunkSize?: number,
|
||||
): Promise<number> {
|
||||
const startTime = Date.now();
|
||||
|
||||
if (keys.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const size = chunkSize ?? cacheConfig.REDIS_DELETE_CHUNK_SIZE;
|
||||
const mode = cacheConfig.USE_REDIS_CLUSTER ? 'cluster' : 'single-node';
|
||||
const deletePromises = [];
|
||||
|
||||
if (cacheConfig.USE_REDIS_CLUSTER) {
|
||||
// Cluster mode: Delete each key individually in parallel chunks to avoid CROSSSLOT errors
|
||||
for (let i = 0; i < keys.length; i += size) {
|
||||
const chunk = keys.slice(i, i + size);
|
||||
deletePromises.push(Promise.all(chunk.map((key) => client.del(key))));
|
||||
}
|
||||
} else {
|
||||
// Single-node mode: Batch delete chunks using DEL with array
|
||||
for (let i = 0; i < keys.length; i += size) {
|
||||
const chunk = keys.slice(i, i + size);
|
||||
deletePromises.push(client.del(chunk));
|
||||
}
|
||||
}
|
||||
|
||||
const results = await Promise.all(deletePromises);
|
||||
|
||||
// Sum up deleted counts (cluster returns array of individual counts, single-node returns total)
|
||||
const deletedCount = results.reduce((sum: number, count: number | number[]): number => {
|
||||
if (Array.isArray(count)) {
|
||||
return sum + count.reduce((a, b) => a + b, 0);
|
||||
}
|
||||
return sum + count;
|
||||
}, 0);
|
||||
|
||||
// Performance monitoring
|
||||
const duration = Date.now() - startTime;
|
||||
const batchCount = deletePromises.length;
|
||||
|
||||
if (duration > 1000) {
|
||||
logger.warn(
|
||||
`[Redis][batchDeleteKeys] Slow operation - Duration: ${duration}ms, Mode: ${mode}, Keys: ${keys.length}, Deleted: ${deletedCount}, Batches: ${batchCount}, Chunk size: ${size}`,
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
`[Redis][batchDeleteKeys] Duration: ${duration}ms, Mode: ${mode}, Keys: ${keys.length}, Deleted: ${deletedCount}, Batches: ${batchCount}`,
|
||||
);
|
||||
}
|
||||
|
||||
return deletedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans Redis for keys matching a pattern and collects them into an array.
|
||||
* Uses Redis SCAN to avoid blocking the server.
|
||||
*
|
||||
* @param client - Redis client (node or cluster) with scanIterator support
|
||||
* @param pattern - Pattern to match keys (e.g., 'user:*', 'session:*:active')
|
||||
* @param count - Optional SCAN COUNT hint (defaults to REDIS_SCAN_COUNT config)
|
||||
* @returns Array of matching keys
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const userKeys = await scanKeys(keyvRedisClient, 'user:*');
|
||||
* const sessionKeys = await scanKeys(keyvRedisClient, 'session:*:active', 500);
|
||||
* ```
|
||||
*/
|
||||
export async function scanKeys(
|
||||
client: RedisClientType | RedisClusterType,
|
||||
pattern: string,
|
||||
count?: number,
|
||||
): Promise<string[]> {
|
||||
const startTime = Date.now();
|
||||
const keys: string[] = [];
|
||||
|
||||
// Type guard to check if client has scanIterator
|
||||
if (!('scanIterator' in client)) {
|
||||
throw new Error('Redis client does not support scanIterator');
|
||||
}
|
||||
|
||||
const scanCount = count ?? cacheConfig.REDIS_SCAN_COUNT;
|
||||
|
||||
for await (const key of client.scanIterator({
|
||||
MATCH: pattern,
|
||||
COUNT: scanCount,
|
||||
})) {
|
||||
keys.push(key);
|
||||
}
|
||||
|
||||
// Performance monitoring
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
if (duration > 1000) {
|
||||
logger.warn(
|
||||
`[Redis][scanKeys] Slow operation - Duration: ${duration}ms, Pattern: "${pattern}", Keys found: ${keys.length}, Scan count: ${scanCount}`,
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
`[Redis][scanKeys] Duration: ${duration}ms, Pattern: "${pattern}", Keys found: ${keys.length}`,
|
||||
);
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue