👑 feat: Distributed Leader Election with Redis for Multi-instance Coordination (#10189)

* 🔧 refactor: Move GLOBAL_PREFIX_SEPARATOR to cacheConfig for consistency

* 👑 feat: Implement distributed leader election using Redis
This commit is contained in:
Theo N. Truong 2025-10-30 15:08:04 -06:00 committed by GitHub
parent 1e53ffa7ea
commit 8f4705f683
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 452 additions and 15 deletions

View file

@ -702,6 +702,16 @@ HELP_AND_FAQ_URL=https://librechat.ai
# Comma-separated list of CacheKeys (e.g., ROLES,MESSAGES)
# FORCED_IN_MEMORY_CACHE_NAMESPACES=ROLES,MESSAGES
# Leader Election Configuration (for multi-instance deployments with Redis)
# Duration in seconds that the leader lease is valid before it expires (default: 25)
# LEADER_LEASE_DURATION=25
# Interval in seconds at which the leader renews its lease (default: 10)
# LEADER_RENEW_INTERVAL=10
# Maximum number of retry attempts when renewing the lease fails (default: 3)
# LEADER_RENEW_ATTEMPTS=3
# Delay in seconds between retry attempts when renewing the lease (default: 0.5)
# LEADER_RENEW_RETRY_DELAY=0.5
#==================================================#
# Others #
#==================================================#

View file

@ -8,12 +8,13 @@ on:
- release/*
paths:
- 'packages/api/src/cache/**'
- 'packages/api/src/cluster/**'
- 'redis-config/**'
- '.github/workflows/cache-integration-tests.yml'
jobs:
cache_integration_tests:
name: Run Cache Integration Tests
name: Integration Tests that use actual Redis Cache
timeout-minutes: 30
runs-on: ubuntu-latest
@ -66,7 +67,15 @@ jobs:
USE_REDIS: true
REDIS_URI: redis://127.0.0.1:6379
REDIS_CLUSTER_URI: redis://127.0.0.1:7001,redis://127.0.0.1:7002,redis://127.0.0.1:7003
run: npm run test:cache:integration
run: npm run test:cache-integration:core
- name: Run cluster integration tests
working-directory: packages/api
env:
NODE_ENV: test
USE_REDIS: true
REDIS_URI: redis://127.0.0.1:6379
run: npm run test:cache-integration:cluster
- name: Stop Redis Cluster
if: always()

View file

@ -18,9 +18,10 @@
"build:dev": "npm run clean && NODE_ENV=development rollup -c --bundleConfigAsCjs",
"build:watch": "NODE_ENV=development rollup -c -w --bundleConfigAsCjs",
"build:watch:prod": "rollup -c -w --bundleConfigAsCjs",
"test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.integration\\.\"",
"test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.integration\\.\"",
"test:cache:integration": "jest --testPathPattern=\"src/cache/.*\\.integration\\.spec\\.ts$\" --coverage=false",
"test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.*integration\\.\"",
"test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.*integration\\.\"",
"test:cache-integration:core": "jest --testPathPattern=\"src/cache/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false",
"test:cache-integration:cluster": "jest --testPathPattern=\"src/cluster/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false --runInBand",
"verify": "npm run test:ci",
"b:clean": "bun run rimraf dist",
"b:build": "bun run b:clean && bun run rollup -c --silent --bundleConfigAsCjs",

View file

@ -1,11 +1,14 @@
import type { Keyv } from 'keyv';
// Mock GLOBAL_PREFIX_SEPARATOR
jest.mock('../../redisClients', () => {
const originalModule = jest.requireActual('../../redisClients');
// Mock GLOBAL_PREFIX_SEPARATOR from cacheConfig
jest.mock('../../cacheConfig', () => {
const originalModule = jest.requireActual('../../cacheConfig');
return {
...originalModule,
cacheConfig: {
...originalModule.cacheConfig,
GLOBAL_PREFIX_SEPARATOR: '>>',
},
};
});

View file

@ -65,6 +65,7 @@ const cacheConfig = {
REDIS_PASSWORD: process.env.REDIS_PASSWORD,
REDIS_CA: getRedisCA(),
REDIS_KEY_PREFIX: process.env[REDIS_KEY_PREFIX_VAR ?? ''] || REDIS_KEY_PREFIX || '',
GLOBAL_PREFIX_SEPARATOR: '::',
REDIS_MAX_LISTENERS: math(process.env.REDIS_MAX_LISTENERS, 40),
REDIS_PING_INTERVAL: math(process.env.REDIS_PING_INTERVAL, 0),
/** Max delay between reconnection attempts in ms */

View file

@ -14,7 +14,7 @@ import { logger } from '@librechat/data-schemas';
import session, { MemoryStore } from 'express-session';
import { RedisStore as ConnectRedis } from 'connect-redis';
import type { SendCommandFn } from 'rate-limit-redis';
import { keyvRedisClient, ioredisClient, GLOBAL_PREFIX_SEPARATOR } from './redisClients';
import { keyvRedisClient, ioredisClient } from './redisClients';
import { cacheConfig } from './cacheConfig';
import { violationFile } from './keyvFiles';
@ -31,7 +31,7 @@ export const standardCache = (namespace: string, ttl?: number, fallbackStore?: o
const keyvRedis = new KeyvRedis(keyvRedisClient);
const cache = new Keyv(keyvRedis, { namespace, ttl });
keyvRedis.namespace = cacheConfig.REDIS_KEY_PREFIX;
keyvRedis.keyPrefixSeparator = GLOBAL_PREFIX_SEPARATOR;
keyvRedis.keyPrefixSeparator = cacheConfig.GLOBAL_PREFIX_SEPARATOR;
cache.on('error', (err) => {
logger.error(`Cache error in namespace ${namespace}:`, err);

View file

@ -5,8 +5,6 @@ import { createClient, createCluster } from '@keyv/redis';
import type { RedisClientType, RedisClusterType } from '@redis/client';
import { cacheConfig } from './cacheConfig';
const GLOBAL_PREFIX_SEPARATOR = '::';
const urls = cacheConfig.REDIS_URI?.split(',').map((uri) => new URL(uri)) || [];
const username = urls?.[0]?.username || cacheConfig.REDIS_USERNAME;
const password = urls?.[0]?.password || cacheConfig.REDIS_PASSWORD;
@ -18,7 +16,7 @@ if (cacheConfig.USE_REDIS) {
username: username,
password: password,
tls: ca ? { ca } : undefined,
keyPrefix: `${cacheConfig.REDIS_KEY_PREFIX}${GLOBAL_PREFIX_SEPARATOR}`,
keyPrefix: `${cacheConfig.REDIS_KEY_PREFIX}${cacheConfig.GLOBAL_PREFIX_SEPARATOR}`,
maxListeners: cacheConfig.REDIS_MAX_LISTENERS,
retryStrategy: (times: number) => {
if (
@ -192,4 +190,4 @@ if (cacheConfig.USE_REDIS) {
});
}
export { ioredisClient, keyvRedisClient, GLOBAL_PREFIX_SEPARATOR };
export { ioredisClient, keyvRedisClient };

View file

@ -0,0 +1,180 @@
import { keyvRedisClient } from '~/cache/redisClients';
import { cacheConfig as cache } from '~/cache/cacheConfig';
import { clusterConfig as cluster } from './config';
import { logger } from '@librechat/data-schemas';
/**
* Distributed leader election implementation using Redis for coordination across multiple server instances.
*
* Leadership election:
* - During bootup, every server attempts to become the leader by calling isLeader()
* - Uses atomic Redis SET NX (set if not exists) to ensure only ONE server can claim leadership
* - The first server to successfully set the key becomes the leader; others become followers
* - Works with any number of servers (1 to infinite) - single server always becomes leader
*
* Leadership maintenance:
* - Leader holds a key in Redis with a 25-second lease duration
* - Leader renews this lease every 10 seconds to maintain leadership
* - If leader crashes, the lease eventually expires, and the key disappears
* - On shutdown, leader deletes its key to allow immediate re-election
* - Followers check for leadership and attempt to claim it when the key is empty
*/
export class LeaderElection {
// We can't use Keyv namespace here because we need direct Redis access for atomic operations
static readonly LEADER_KEY = `${cache.REDIS_KEY_PREFIX}${cache.GLOBAL_PREFIX_SEPARATOR}LeadingServerUUID`;
private static _instance = new LeaderElection();
readonly UUID: string = crypto.randomUUID();
private refreshTimer: NodeJS.Timeout | undefined = undefined;
// DO NOT create new instances of this class directly.
// Use the exported isLeader() function which uses a singleton instance.
constructor() {
if (LeaderElection._instance) return LeaderElection._instance;
process.on('SIGTERM', () => this.resign());
process.on('SIGINT', () => this.resign());
LeaderElection._instance = this;
}
/**
* Checks if this instance is the current leader.
* If no leader exists, waits upto 2 seconds (randomized to avoid thundering herd) then attempts self-election.
* Always returns true in non-Redis mode (single-instance deployment).
*/
public async isLeader(): Promise<boolean> {
if (!cache.USE_REDIS) return true;
try {
const currentLeader = await LeaderElection.getLeaderUUID();
// If we own the leadership lock, return true.
// However, in case the leadership refresh retries have been exhausted, something has gone wrong.
// This server is not considered the leader anymore, similar to a crash, to avoid split-brain scenario.
if (currentLeader === this.UUID) return this.refreshTimer != null;
if (currentLeader != null) return false; // someone holds leadership lock
const delay = Math.random() * 2000;
await new Promise((resolve) => setTimeout(resolve, delay));
return await this.electSelf();
} catch (error) {
logger.error('Failed to check leadership status:', error);
return false;
}
}
/**
* Steps down from leadership by stopping the refresh timer and releasing the leader key.
* Atomically deletes the leader key (only if we still own it) so another server can become leader immediately.
*/
public async resign(): Promise<void> {
if (!cache.USE_REDIS) return;
try {
this.clearRefreshTimer();
// Lua script for atomic check-and-delete (only delete if we still own it)
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
redis.call("del", KEYS[1])
end
`;
await keyvRedisClient!.eval(script, {
keys: [LeaderElection.LEADER_KEY],
arguments: [this.UUID],
});
} catch (error) {
logger.error('Failed to release leadership lock:', error);
}
}
/**
* Gets the UUID of the current leader from Redis.
* Returns null if no leader exists or in non-Redis mode.
* Useful for testing and observability.
*/
public static async getLeaderUUID(): Promise<string | null> {
if (!cache.USE_REDIS) return null;
return await keyvRedisClient!.get(LeaderElection.LEADER_KEY);
}
/**
* Clears the refresh timer to stop leadership maintenance.
* Called when resigning or failing to refresh leadership.
* Calling this directly to simulate a crash in testing.
*/
public clearRefreshTimer(): void {
clearInterval(this.refreshTimer);
this.refreshTimer = undefined;
}
/**
* Attempts to claim leadership using atomic Redis SET NX (set if not exists).
* If successful, starts a refresh timer to maintain leadership by extending the lease duration.
* The NX flag ensures only one server can become leader even if multiple attempt simultaneously.
*/
private async electSelf(): Promise<boolean> {
try {
const result = await keyvRedisClient!.set(LeaderElection.LEADER_KEY, this.UUID, {
NX: true,
EX: cluster.LEADER_LEASE_DURATION,
});
if (result !== 'OK') return false;
this.clearRefreshTimer();
this.refreshTimer = setInterval(async () => {
await this.renewLeadership();
}, cluster.LEADER_RENEW_INTERVAL * 1000);
this.refreshTimer.unref();
return true;
} catch (error) {
logger.error('Leader election failed:', error);
return false;
}
}
/**
* Renews leadership by extending the lease duration on the leader key.
* Uses Lua script to atomically verify we still own the key before renewing (prevents race conditions).
* If we've lost leadership (key was taken by another server), stops the refresh timer.
* This is called every 10 seconds by the refresh timer.
*/
private async renewLeadership(attempts: number = 1): Promise<void> {
try {
// Lua script for atomic check-and-renew
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
`;
const result = await keyvRedisClient!.eval(script, {
keys: [LeaderElection.LEADER_KEY],
arguments: [this.UUID, cluster.LEADER_LEASE_DURATION.toString()],
});
if (result === 0) {
logger.warn('Lost leadership, clearing refresh timer');
this.clearRefreshTimer();
}
} catch (error) {
logger.error(`Failed to renew leadership (attempts No.${attempts}):`, error);
if (attempts <= cluster.LEADER_RENEW_ATTEMPTS) {
await new Promise((resolve) =>
setTimeout(resolve, cluster.LEADER_RENEW_RETRY_DELAY * 1000),
);
await this.renewLeadership(attempts + 1);
} else {
logger.error('Exceeded maximum attempts to renew leadership.');
this.clearRefreshTimer();
}
}
}
}
const defaultElection = new LeaderElection();
export const isLeader = (): Promise<boolean> => defaultElection.isLeader();

View file

@ -0,0 +1,220 @@
import { expect } from '@playwright/test';
describe('LeaderElection with Redis', () => {
let LeaderElection: typeof import('../LeaderElection').LeaderElection;
let instances: InstanceType<typeof import('../LeaderElection').LeaderElection>[] = [];
let keyvRedisClient: Awaited<typeof import('~/cache/redisClients')>['keyvRedisClient'];
let ioredisClient: Awaited<typeof import('~/cache/redisClients')>['ioredisClient'];
beforeAll(async () => {
// Set up environment variables for Redis
process.env.USE_REDIS = 'true';
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
process.env.REDIS_KEY_PREFIX = 'LeaderElection-IntegrationTest';
// Import modules after setting env vars
const leaderElectionModule = await import('../LeaderElection');
const redisClients = await import('~/cache/redisClients');
LeaderElection = leaderElectionModule.LeaderElection;
keyvRedisClient = redisClients.keyvRedisClient;
ioredisClient = redisClients.ioredisClient;
// Ensure Redis is connected
if (!keyvRedisClient) {
throw new Error('Redis client is not initialized');
}
// Wait for Redis to be ready
if (!keyvRedisClient.isOpen) {
await keyvRedisClient.connect();
}
// Increase max listeners to handle many instances in tests
process.setMaxListeners(200);
});
afterEach(async () => {
await Promise.all(instances.map((instance) => instance.resign()));
instances = [];
// Clean up: clear the leader key directly from Redis
if (keyvRedisClient) {
await keyvRedisClient.del(LeaderElection.LEADER_KEY);
}
});
afterAll(async () => {
// Close both Redis clients to prevent hanging
if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect();
if (ioredisClient?.status === 'ready') await ioredisClient.quit();
});
describe('Test Case 1: Simulate shutdown of the leader', () => {
it('should elect a new leader after the current leader resigns', async () => {
// Create 100 instances
instances = Array.from({ length: 100 }, () => new LeaderElection());
// Call isLeader on all instances and get leadership status
const resultsWithInstances = await Promise.all(
instances.map(async (instance) => ({
instance,
isLeader: await instance.isLeader(),
})),
);
// Find leader and followers
const leaders = resultsWithInstances.filter((r) => r.isLeader);
const followers = resultsWithInstances.filter((r) => !r.isLeader);
const leader = leaders[0].instance;
const nextLeader = followers[0].instance;
// Verify only one is leader
expect(leaders.length).toBe(1);
// Verify getLeaderUUID matches the leader's UUID
expect(await LeaderElection.getLeaderUUID()).toBe(leader.UUID);
// Leader resigns
await leader.resign();
// Verify getLeaderUUID returns null after resignation
expect(await LeaderElection.getLeaderUUID()).toBeNull();
// Next instance to call isLeader should become the new leader
expect(await nextLeader.isLeader()).toBe(true);
}, 30000); // 30 second timeout for 100 instances
});
describe('Test Case 2: Simulate crash of the leader', () => {
it('should allow re-election after leader crashes (lease expires)', async () => {
// Mock config with short lease duration
const clusterConfigModule = await import('../config');
const originalConfig = { ...clusterConfigModule.clusterConfig };
// Override config values for this test
Object.assign(clusterConfigModule.clusterConfig, {
LEADER_LEASE_DURATION: 2,
LEADER_RENEW_INTERVAL: 4,
});
try {
// Create 1 instance with mocked config
const instance = new LeaderElection();
instances.push(instance);
// Become leader
expect(await instance.isLeader()).toBe(true);
// Verify leader UUID is set
expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID);
// Simulate crash by clearing refresh timer
instance.clearRefreshTimer();
// The instance no longer considers itself leader even though it still holds the key
expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID);
expect(await instance.isLeader()).toBe(false);
// Wait for lease to expire (3 seconds > 2 second lease)
await new Promise((resolve) => setTimeout(resolve, 3000));
// Verify leader UUID is null after lease expiration
expect(await LeaderElection.getLeaderUUID()).toBeNull();
} finally {
// Restore original config values
Object.assign(clusterConfigModule.clusterConfig, originalConfig);
}
}, 15000); // 15 second timeout
});
describe('Test Case 3: Stress testing', () => {
it('should ensure only one instance becomes leader even when multiple instances call electSelf() at once', async () => {
// Create 10 instances
instances = Array.from({ length: 10 }, () => new LeaderElection());
// Call electSelf on all instances in parallel
const results = await Promise.all(instances.map((instance) => instance['electSelf']()));
// Verify only one returned true
const successCount = results.filter((success) => success).length;
expect(successCount).toBe(1);
// Find the winning instance
const winnerInstance = instances.find((_, index) => results[index]);
// Verify getLeaderUUID matches the winner's UUID
expect(await LeaderElection.getLeaderUUID()).toBe(winnerInstance?.UUID);
}, 15000); // 15 second timeout
});
});
describe('LeaderElection without Redis', () => {
let LeaderElection: typeof import('../LeaderElection').LeaderElection;
let instances: InstanceType<typeof import('../LeaderElection').LeaderElection>[] = [];
beforeAll(async () => {
// Set up environment variables for non-Redis mode
process.env.USE_REDIS = 'false';
// Reset all modules to force re-evaluation with new env vars
jest.resetModules();
// Import modules after setting env vars and resetting modules
const leaderElectionModule = await import('../LeaderElection');
LeaderElection = leaderElectionModule.LeaderElection;
});
afterEach(async () => {
await Promise.all(instances.map((instance) => instance.resign()));
instances = [];
});
afterAll(() => {
// Restore environment variables
process.env.USE_REDIS = 'true';
// Reset all modules to ensure next test runs get fresh imports
jest.resetModules();
});
it('should allow all instances to be leaders when USE_REDIS is false', async () => {
// Create 10 instances
instances = Array.from({ length: 10 }, () => new LeaderElection());
// Call isLeader on all instances
const results = await Promise.all(instances.map((instance) => instance.isLeader()));
// Verify all instances report themselves as leaders
expect(results.every((isLeader) => isLeader)).toBe(true);
expect(results.filter((isLeader) => isLeader).length).toBe(10);
});
it('should return null for getLeaderUUID when USE_REDIS is false', async () => {
// Create a few instances
instances = Array.from({ length: 3 }, () => new LeaderElection());
// Call isLeader on all instances to make them "leaders"
await Promise.all(instances.map((instance) => instance.isLeader()));
// Verify getLeaderUUID returns null in non-Redis mode
expect(await LeaderElection.getLeaderUUID()).toBeNull();
});
it('should allow resign() to be called without throwing errors', async () => {
// Create multiple instances
instances = Array.from({ length: 5 }, () => new LeaderElection());
// Make them all leaders
await Promise.all(instances.map((instance) => instance.isLeader()));
// Call resign on all instances - should not throw
await expect(
Promise.all(instances.map((instance) => instance.resign())),
).resolves.not.toThrow();
// Verify they're still leaders after resigning (since there's no shared state)
const results = await Promise.all(instances.map((instance) => instance.isLeader()));
expect(results.every((isLeader) => isLeader)).toBe(true);
});
});

View file

@ -0,0 +1,14 @@
import { math } from '~/utils';
const clusterConfig = {
/** Duration in seconds that the leader lease is valid before it expires */
LEADER_LEASE_DURATION: math(process.env.LEADER_LEASE_DURATION, 25),
/** Interval in seconds at which the leader renews its lease */
LEADER_RENEW_INTERVAL: math(process.env.LEADER_RENEW_INTERVAL, 10),
/** Maximum number of retry attempts when renewing the lease fails */
LEADER_RENEW_ATTEMPTS: math(process.env.LEADER_RENEW_ATTEMPTS, 3),
/** Delay in seconds between retry attempts when renewing the lease */
LEADER_RENEW_RETRY_DELAY: math(process.env.LEADER_RENEW_RETRY_DELAY, 0.5),
};
export { clusterConfig };

View file

@ -0,0 +1 @@
export { isLeader } from './LeaderElection';