From 8f4705f683efd13b5c77417e0586bdfef8d741a6 Mon Sep 17 00:00:00 2001 From: "Theo N. Truong" <644650+nhtruong@users.noreply.github.com> Date: Thu, 30 Oct 2025 15:08:04 -0600 Subject: [PATCH] =?UTF-8?q?=F0=9F=91=91=20feat:=20Distributed=20Leader=20E?= =?UTF-8?q?lection=20with=20Redis=20for=20Multi-instance=20Coordination=20?= =?UTF-8?q?(#10189)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🔧 refactor: Move GLOBAL_PREFIX_SEPARATOR to cacheConfig for consistency * 👑 feat: Implement distributed leader election using Redis --- .env.example | 10 + .github/workflows/cache-integration-tests.yml | 13 +- packages/api/package.json | 7 +- ...=> limiterCache.cache_integration.spec.ts} | 0 ...=> sessionCache.cache_integration.spec.ts} | 0 ...> standardCache.cache_integration.spec.ts} | 11 +- ... violationCache.cache_integration.spec.ts} | 0 ...=> redisClients.cache_integration.spec.ts} | 0 packages/api/src/cache/cacheConfig.ts | 1 + packages/api/src/cache/cacheFactory.ts | 4 +- packages/api/src/cache/redisClients.ts | 6 +- packages/api/src/cluster/LeaderElection.ts | 180 ++++++++++++++ .../LeaderElection.cache_integration.spec.ts | 220 ++++++++++++++++++ packages/api/src/cluster/config.ts | 14 ++ packages/api/src/cluster/index.ts | 1 + 15 files changed, 452 insertions(+), 15 deletions(-) rename packages/api/src/cache/__tests__/cacheFactory/{limiterCache.integration.spec.ts => limiterCache.cache_integration.spec.ts} (100%) rename packages/api/src/cache/__tests__/cacheFactory/{sessionCache.integration.spec.ts => sessionCache.cache_integration.spec.ts} (100%) rename packages/api/src/cache/__tests__/cacheFactory/{standardCache.integration.spec.ts => standardCache.cache_integration.spec.ts} (96%) rename packages/api/src/cache/__tests__/cacheFactory/{violationCache.integration.spec.ts => violationCache.cache_integration.spec.ts} (100%) rename packages/api/src/cache/__tests__/{redisClients.integration.spec.ts => redisClients.cache_integration.spec.ts} (100%) create mode 100644 packages/api/src/cluster/LeaderElection.ts create mode 100644 packages/api/src/cluster/__tests__/LeaderElection.cache_integration.spec.ts create mode 100644 packages/api/src/cluster/config.ts create mode 100644 packages/api/src/cluster/index.ts diff --git a/.env.example b/.env.example index f1666fb763..24c74487aa 100644 --- a/.env.example +++ b/.env.example @@ -702,6 +702,16 @@ HELP_AND_FAQ_URL=https://librechat.ai # Comma-separated list of CacheKeys (e.g., ROLES,MESSAGES) # FORCED_IN_MEMORY_CACHE_NAMESPACES=ROLES,MESSAGES +# Leader Election Configuration (for multi-instance deployments with Redis) +# Duration in seconds that the leader lease is valid before it expires (default: 25) +# LEADER_LEASE_DURATION=25 +# Interval in seconds at which the leader renews its lease (default: 10) +# LEADER_RENEW_INTERVAL=10 +# Maximum number of retry attempts when renewing the lease fails (default: 3) +# LEADER_RENEW_ATTEMPTS=3 +# Delay in seconds between retry attempts when renewing the lease (default: 0.5) +# LEADER_RENEW_RETRY_DELAY=0.5 + #==================================================# # Others # #==================================================# diff --git a/.github/workflows/cache-integration-tests.yml b/.github/workflows/cache-integration-tests.yml index 2afe68287e..f7ac638282 100644 --- a/.github/workflows/cache-integration-tests.yml +++ b/.github/workflows/cache-integration-tests.yml @@ -8,12 +8,13 @@ on: - release/* paths: - 'packages/api/src/cache/**' + - 'packages/api/src/cluster/**' - 'redis-config/**' - '.github/workflows/cache-integration-tests.yml' jobs: cache_integration_tests: - name: Run Cache Integration Tests + name: Integration Tests that use actual Redis Cache timeout-minutes: 30 runs-on: ubuntu-latest @@ -66,7 +67,15 @@ jobs: USE_REDIS: true REDIS_URI: redis://127.0.0.1:6379 REDIS_CLUSTER_URI: redis://127.0.0.1:7001,redis://127.0.0.1:7002,redis://127.0.0.1:7003 - run: npm run test:cache:integration + run: npm run test:cache-integration:core + + - name: Run cluster integration tests + working-directory: packages/api + env: + NODE_ENV: test + USE_REDIS: true + REDIS_URI: redis://127.0.0.1:6379 + run: npm run test:cache-integration:cluster - name: Stop Redis Cluster if: always() diff --git a/packages/api/package.json b/packages/api/package.json index 7db4a54a21..4d333082a3 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -18,9 +18,10 @@ "build:dev": "npm run clean && NODE_ENV=development rollup -c --bundleConfigAsCjs", "build:watch": "NODE_ENV=development rollup -c -w --bundleConfigAsCjs", "build:watch:prod": "rollup -c -w --bundleConfigAsCjs", - "test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.integration\\.\"", - "test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.integration\\.\"", - "test:cache:integration": "jest --testPathPattern=\"src/cache/.*\\.integration\\.spec\\.ts$\" --coverage=false", + "test": "jest --coverage --watch --testPathIgnorePatterns=\"\\.*integration\\.\"", + "test:ci": "jest --coverage --ci --testPathIgnorePatterns=\"\\.*integration\\.\"", + "test:cache-integration:core": "jest --testPathPattern=\"src/cache/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false", + "test:cache-integration:cluster": "jest --testPathPattern=\"src/cluster/.*\\.cache_integration\\.spec\\.ts$\" --coverage=false --runInBand", "verify": "npm run test:ci", "b:clean": "bun run rimraf dist", "b:build": "bun run b:clean && bun run rollup -c --silent --bundleConfigAsCjs", diff --git a/packages/api/src/cache/__tests__/cacheFactory/limiterCache.integration.spec.ts b/packages/api/src/cache/__tests__/cacheFactory/limiterCache.cache_integration.spec.ts similarity index 100% rename from packages/api/src/cache/__tests__/cacheFactory/limiterCache.integration.spec.ts rename to packages/api/src/cache/__tests__/cacheFactory/limiterCache.cache_integration.spec.ts diff --git a/packages/api/src/cache/__tests__/cacheFactory/sessionCache.integration.spec.ts b/packages/api/src/cache/__tests__/cacheFactory/sessionCache.cache_integration.spec.ts similarity index 100% rename from packages/api/src/cache/__tests__/cacheFactory/sessionCache.integration.spec.ts rename to packages/api/src/cache/__tests__/cacheFactory/sessionCache.cache_integration.spec.ts diff --git a/packages/api/src/cache/__tests__/cacheFactory/standardCache.integration.spec.ts b/packages/api/src/cache/__tests__/cacheFactory/standardCache.cache_integration.spec.ts similarity index 96% rename from packages/api/src/cache/__tests__/cacheFactory/standardCache.integration.spec.ts rename to packages/api/src/cache/__tests__/cacheFactory/standardCache.cache_integration.spec.ts index db40ad636c..b5fcc207da 100644 --- a/packages/api/src/cache/__tests__/cacheFactory/standardCache.integration.spec.ts +++ b/packages/api/src/cache/__tests__/cacheFactory/standardCache.cache_integration.spec.ts @@ -1,11 +1,14 @@ import type { Keyv } from 'keyv'; -// Mock GLOBAL_PREFIX_SEPARATOR -jest.mock('../../redisClients', () => { - const originalModule = jest.requireActual('../../redisClients'); +// Mock GLOBAL_PREFIX_SEPARATOR from cacheConfig +jest.mock('../../cacheConfig', () => { + const originalModule = jest.requireActual('../../cacheConfig'); return { ...originalModule, - GLOBAL_PREFIX_SEPARATOR: '>>', + cacheConfig: { + ...originalModule.cacheConfig, + GLOBAL_PREFIX_SEPARATOR: '>>', + }, }; }); diff --git a/packages/api/src/cache/__tests__/cacheFactory/violationCache.integration.spec.ts b/packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts similarity index 100% rename from packages/api/src/cache/__tests__/cacheFactory/violationCache.integration.spec.ts rename to packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts diff --git a/packages/api/src/cache/__tests__/redisClients.integration.spec.ts b/packages/api/src/cache/__tests__/redisClients.cache_integration.spec.ts similarity index 100% rename from packages/api/src/cache/__tests__/redisClients.integration.spec.ts rename to packages/api/src/cache/__tests__/redisClients.cache_integration.spec.ts diff --git a/packages/api/src/cache/cacheConfig.ts b/packages/api/src/cache/cacheConfig.ts index aebfeef3bd..3e5c1646d2 100644 --- a/packages/api/src/cache/cacheConfig.ts +++ b/packages/api/src/cache/cacheConfig.ts @@ -65,6 +65,7 @@ const cacheConfig = { REDIS_PASSWORD: process.env.REDIS_PASSWORD, REDIS_CA: getRedisCA(), REDIS_KEY_PREFIX: process.env[REDIS_KEY_PREFIX_VAR ?? ''] || REDIS_KEY_PREFIX || '', + GLOBAL_PREFIX_SEPARATOR: '::', REDIS_MAX_LISTENERS: math(process.env.REDIS_MAX_LISTENERS, 40), REDIS_PING_INTERVAL: math(process.env.REDIS_PING_INTERVAL, 0), /** Max delay between reconnection attempts in ms */ diff --git a/packages/api/src/cache/cacheFactory.ts b/packages/api/src/cache/cacheFactory.ts index 427b1b38ad..d2244a662a 100644 --- a/packages/api/src/cache/cacheFactory.ts +++ b/packages/api/src/cache/cacheFactory.ts @@ -14,7 +14,7 @@ import { logger } from '@librechat/data-schemas'; import session, { MemoryStore } from 'express-session'; import { RedisStore as ConnectRedis } from 'connect-redis'; import type { SendCommandFn } from 'rate-limit-redis'; -import { keyvRedisClient, ioredisClient, GLOBAL_PREFIX_SEPARATOR } from './redisClients'; +import { keyvRedisClient, ioredisClient } from './redisClients'; import { cacheConfig } from './cacheConfig'; import { violationFile } from './keyvFiles'; @@ -31,7 +31,7 @@ export const standardCache = (namespace: string, ttl?: number, fallbackStore?: o const keyvRedis = new KeyvRedis(keyvRedisClient); const cache = new Keyv(keyvRedis, { namespace, ttl }); keyvRedis.namespace = cacheConfig.REDIS_KEY_PREFIX; - keyvRedis.keyPrefixSeparator = GLOBAL_PREFIX_SEPARATOR; + keyvRedis.keyPrefixSeparator = cacheConfig.GLOBAL_PREFIX_SEPARATOR; cache.on('error', (err) => { logger.error(`Cache error in namespace ${namespace}:`, err); diff --git a/packages/api/src/cache/redisClients.ts b/packages/api/src/cache/redisClients.ts index 6c11c1a0a8..6f0e27d772 100644 --- a/packages/api/src/cache/redisClients.ts +++ b/packages/api/src/cache/redisClients.ts @@ -5,8 +5,6 @@ import { createClient, createCluster } from '@keyv/redis'; import type { RedisClientType, RedisClusterType } from '@redis/client'; import { cacheConfig } from './cacheConfig'; -const GLOBAL_PREFIX_SEPARATOR = '::'; - const urls = cacheConfig.REDIS_URI?.split(',').map((uri) => new URL(uri)) || []; const username = urls?.[0]?.username || cacheConfig.REDIS_USERNAME; const password = urls?.[0]?.password || cacheConfig.REDIS_PASSWORD; @@ -18,7 +16,7 @@ if (cacheConfig.USE_REDIS) { username: username, password: password, tls: ca ? { ca } : undefined, - keyPrefix: `${cacheConfig.REDIS_KEY_PREFIX}${GLOBAL_PREFIX_SEPARATOR}`, + keyPrefix: `${cacheConfig.REDIS_KEY_PREFIX}${cacheConfig.GLOBAL_PREFIX_SEPARATOR}`, maxListeners: cacheConfig.REDIS_MAX_LISTENERS, retryStrategy: (times: number) => { if ( @@ -192,4 +190,4 @@ if (cacheConfig.USE_REDIS) { }); } -export { ioredisClient, keyvRedisClient, GLOBAL_PREFIX_SEPARATOR }; +export { ioredisClient, keyvRedisClient }; diff --git a/packages/api/src/cluster/LeaderElection.ts b/packages/api/src/cluster/LeaderElection.ts new file mode 100644 index 0000000000..726c56b185 --- /dev/null +++ b/packages/api/src/cluster/LeaderElection.ts @@ -0,0 +1,180 @@ +import { keyvRedisClient } from '~/cache/redisClients'; +import { cacheConfig as cache } from '~/cache/cacheConfig'; +import { clusterConfig as cluster } from './config'; +import { logger } from '@librechat/data-schemas'; + +/** + * Distributed leader election implementation using Redis for coordination across multiple server instances. + * + * Leadership election: + * - During bootup, every server attempts to become the leader by calling isLeader() + * - Uses atomic Redis SET NX (set if not exists) to ensure only ONE server can claim leadership + * - The first server to successfully set the key becomes the leader; others become followers + * - Works with any number of servers (1 to infinite) - single server always becomes leader + * + * Leadership maintenance: + * - Leader holds a key in Redis with a 25-second lease duration + * - Leader renews this lease every 10 seconds to maintain leadership + * - If leader crashes, the lease eventually expires, and the key disappears + * - On shutdown, leader deletes its key to allow immediate re-election + * - Followers check for leadership and attempt to claim it when the key is empty + */ +export class LeaderElection { + // We can't use Keyv namespace here because we need direct Redis access for atomic operations + static readonly LEADER_KEY = `${cache.REDIS_KEY_PREFIX}${cache.GLOBAL_PREFIX_SEPARATOR}LeadingServerUUID`; + private static _instance = new LeaderElection(); + + readonly UUID: string = crypto.randomUUID(); + private refreshTimer: NodeJS.Timeout | undefined = undefined; + + // DO NOT create new instances of this class directly. + // Use the exported isLeader() function which uses a singleton instance. + constructor() { + if (LeaderElection._instance) return LeaderElection._instance; + + process.on('SIGTERM', () => this.resign()); + process.on('SIGINT', () => this.resign()); + LeaderElection._instance = this; + } + + /** + * Checks if this instance is the current leader. + * If no leader exists, waits upto 2 seconds (randomized to avoid thundering herd) then attempts self-election. + * Always returns true in non-Redis mode (single-instance deployment). + */ + public async isLeader(): Promise { + if (!cache.USE_REDIS) return true; + + try { + const currentLeader = await LeaderElection.getLeaderUUID(); + // If we own the leadership lock, return true. + // However, in case the leadership refresh retries have been exhausted, something has gone wrong. + // This server is not considered the leader anymore, similar to a crash, to avoid split-brain scenario. + if (currentLeader === this.UUID) return this.refreshTimer != null; + if (currentLeader != null) return false; // someone holds leadership lock + + const delay = Math.random() * 2000; + await new Promise((resolve) => setTimeout(resolve, delay)); + return await this.electSelf(); + } catch (error) { + logger.error('Failed to check leadership status:', error); + return false; + } + } + + /** + * Steps down from leadership by stopping the refresh timer and releasing the leader key. + * Atomically deletes the leader key (only if we still own it) so another server can become leader immediately. + */ + public async resign(): Promise { + if (!cache.USE_REDIS) return; + + try { + this.clearRefreshTimer(); + + // Lua script for atomic check-and-delete (only delete if we still own it) + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + redis.call("del", KEYS[1]) + end + `; + + await keyvRedisClient!.eval(script, { + keys: [LeaderElection.LEADER_KEY], + arguments: [this.UUID], + }); + } catch (error) { + logger.error('Failed to release leadership lock:', error); + } + } + + /** + * Gets the UUID of the current leader from Redis. + * Returns null if no leader exists or in non-Redis mode. + * Useful for testing and observability. + */ + public static async getLeaderUUID(): Promise { + if (!cache.USE_REDIS) return null; + return await keyvRedisClient!.get(LeaderElection.LEADER_KEY); + } + + /** + * Clears the refresh timer to stop leadership maintenance. + * Called when resigning or failing to refresh leadership. + * Calling this directly to simulate a crash in testing. + */ + public clearRefreshTimer(): void { + clearInterval(this.refreshTimer); + this.refreshTimer = undefined; + } + + /** + * Attempts to claim leadership using atomic Redis SET NX (set if not exists). + * If successful, starts a refresh timer to maintain leadership by extending the lease duration. + * The NX flag ensures only one server can become leader even if multiple attempt simultaneously. + */ + private async electSelf(): Promise { + try { + const result = await keyvRedisClient!.set(LeaderElection.LEADER_KEY, this.UUID, { + NX: true, + EX: cluster.LEADER_LEASE_DURATION, + }); + + if (result !== 'OK') return false; + + this.clearRefreshTimer(); + this.refreshTimer = setInterval(async () => { + await this.renewLeadership(); + }, cluster.LEADER_RENEW_INTERVAL * 1000); + this.refreshTimer.unref(); + + return true; + } catch (error) { + logger.error('Leader election failed:', error); + return false; + } + } + + /** + * Renews leadership by extending the lease duration on the leader key. + * Uses Lua script to atomically verify we still own the key before renewing (prevents race conditions). + * If we've lost leadership (key was taken by another server), stops the refresh timer. + * This is called every 10 seconds by the refresh timer. + */ + private async renewLeadership(attempts: number = 1): Promise { + try { + // Lua script for atomic check-and-renew + const script = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("expire", KEYS[1], ARGV[2]) + else + return 0 + end + `; + + const result = await keyvRedisClient!.eval(script, { + keys: [LeaderElection.LEADER_KEY], + arguments: [this.UUID, cluster.LEADER_LEASE_DURATION.toString()], + }); + + if (result === 0) { + logger.warn('Lost leadership, clearing refresh timer'); + this.clearRefreshTimer(); + } + } catch (error) { + logger.error(`Failed to renew leadership (attempts No.${attempts}):`, error); + if (attempts <= cluster.LEADER_RENEW_ATTEMPTS) { + await new Promise((resolve) => + setTimeout(resolve, cluster.LEADER_RENEW_RETRY_DELAY * 1000), + ); + await this.renewLeadership(attempts + 1); + } else { + logger.error('Exceeded maximum attempts to renew leadership.'); + this.clearRefreshTimer(); + } + } + } +} + +const defaultElection = new LeaderElection(); +export const isLeader = (): Promise => defaultElection.isLeader(); diff --git a/packages/api/src/cluster/__tests__/LeaderElection.cache_integration.spec.ts b/packages/api/src/cluster/__tests__/LeaderElection.cache_integration.spec.ts new file mode 100644 index 0000000000..60bc1b439b --- /dev/null +++ b/packages/api/src/cluster/__tests__/LeaderElection.cache_integration.spec.ts @@ -0,0 +1,220 @@ +import { expect } from '@playwright/test'; + +describe('LeaderElection with Redis', () => { + let LeaderElection: typeof import('../LeaderElection').LeaderElection; + let instances: InstanceType[] = []; + let keyvRedisClient: Awaited['keyvRedisClient']; + let ioredisClient: Awaited['ioredisClient']; + + beforeAll(async () => { + // Set up environment variables for Redis + process.env.USE_REDIS = 'true'; + process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; + process.env.REDIS_KEY_PREFIX = 'LeaderElection-IntegrationTest'; + + // Import modules after setting env vars + const leaderElectionModule = await import('../LeaderElection'); + const redisClients = await import('~/cache/redisClients'); + + LeaderElection = leaderElectionModule.LeaderElection; + keyvRedisClient = redisClients.keyvRedisClient; + ioredisClient = redisClients.ioredisClient; + + // Ensure Redis is connected + if (!keyvRedisClient) { + throw new Error('Redis client is not initialized'); + } + + // Wait for Redis to be ready + if (!keyvRedisClient.isOpen) { + await keyvRedisClient.connect(); + } + + // Increase max listeners to handle many instances in tests + process.setMaxListeners(200); + }); + + afterEach(async () => { + await Promise.all(instances.map((instance) => instance.resign())); + instances = []; + + // Clean up: clear the leader key directly from Redis + if (keyvRedisClient) { + await keyvRedisClient.del(LeaderElection.LEADER_KEY); + } + }); + + afterAll(async () => { + // Close both Redis clients to prevent hanging + if (keyvRedisClient?.isOpen) await keyvRedisClient.disconnect(); + if (ioredisClient?.status === 'ready') await ioredisClient.quit(); + }); + + describe('Test Case 1: Simulate shutdown of the leader', () => { + it('should elect a new leader after the current leader resigns', async () => { + // Create 100 instances + instances = Array.from({ length: 100 }, () => new LeaderElection()); + + // Call isLeader on all instances and get leadership status + const resultsWithInstances = await Promise.all( + instances.map(async (instance) => ({ + instance, + isLeader: await instance.isLeader(), + })), + ); + + // Find leader and followers + const leaders = resultsWithInstances.filter((r) => r.isLeader); + const followers = resultsWithInstances.filter((r) => !r.isLeader); + const leader = leaders[0].instance; + const nextLeader = followers[0].instance; + + // Verify only one is leader + expect(leaders.length).toBe(1); + + // Verify getLeaderUUID matches the leader's UUID + expect(await LeaderElection.getLeaderUUID()).toBe(leader.UUID); + + // Leader resigns + await leader.resign(); + + // Verify getLeaderUUID returns null after resignation + expect(await LeaderElection.getLeaderUUID()).toBeNull(); + + // Next instance to call isLeader should become the new leader + expect(await nextLeader.isLeader()).toBe(true); + }, 30000); // 30 second timeout for 100 instances + }); + + describe('Test Case 2: Simulate crash of the leader', () => { + it('should allow re-election after leader crashes (lease expires)', async () => { + // Mock config with short lease duration + const clusterConfigModule = await import('../config'); + const originalConfig = { ...clusterConfigModule.clusterConfig }; + + // Override config values for this test + Object.assign(clusterConfigModule.clusterConfig, { + LEADER_LEASE_DURATION: 2, + LEADER_RENEW_INTERVAL: 4, + }); + + try { + // Create 1 instance with mocked config + const instance = new LeaderElection(); + instances.push(instance); + + // Become leader + expect(await instance.isLeader()).toBe(true); + + // Verify leader UUID is set + expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID); + + // Simulate crash by clearing refresh timer + instance.clearRefreshTimer(); + + // The instance no longer considers itself leader even though it still holds the key + expect(await LeaderElection.getLeaderUUID()).toBe(instance.UUID); + expect(await instance.isLeader()).toBe(false); + + // Wait for lease to expire (3 seconds > 2 second lease) + await new Promise((resolve) => setTimeout(resolve, 3000)); + + // Verify leader UUID is null after lease expiration + expect(await LeaderElection.getLeaderUUID()).toBeNull(); + } finally { + // Restore original config values + Object.assign(clusterConfigModule.clusterConfig, originalConfig); + } + }, 15000); // 15 second timeout + }); + + describe('Test Case 3: Stress testing', () => { + it('should ensure only one instance becomes leader even when multiple instances call electSelf() at once', async () => { + // Create 10 instances + instances = Array.from({ length: 10 }, () => new LeaderElection()); + + // Call electSelf on all instances in parallel + const results = await Promise.all(instances.map((instance) => instance['electSelf']())); + + // Verify only one returned true + const successCount = results.filter((success) => success).length; + expect(successCount).toBe(1); + + // Find the winning instance + const winnerInstance = instances.find((_, index) => results[index]); + + // Verify getLeaderUUID matches the winner's UUID + expect(await LeaderElection.getLeaderUUID()).toBe(winnerInstance?.UUID); + }, 15000); // 15 second timeout + }); +}); + +describe('LeaderElection without Redis', () => { + let LeaderElection: typeof import('../LeaderElection').LeaderElection; + let instances: InstanceType[] = []; + + beforeAll(async () => { + // Set up environment variables for non-Redis mode + process.env.USE_REDIS = 'false'; + + // Reset all modules to force re-evaluation with new env vars + jest.resetModules(); + + // Import modules after setting env vars and resetting modules + const leaderElectionModule = await import('../LeaderElection'); + LeaderElection = leaderElectionModule.LeaderElection; + }); + + afterEach(async () => { + await Promise.all(instances.map((instance) => instance.resign())); + instances = []; + }); + + afterAll(() => { + // Restore environment variables + process.env.USE_REDIS = 'true'; + + // Reset all modules to ensure next test runs get fresh imports + jest.resetModules(); + }); + + it('should allow all instances to be leaders when USE_REDIS is false', async () => { + // Create 10 instances + instances = Array.from({ length: 10 }, () => new LeaderElection()); + + // Call isLeader on all instances + const results = await Promise.all(instances.map((instance) => instance.isLeader())); + + // Verify all instances report themselves as leaders + expect(results.every((isLeader) => isLeader)).toBe(true); + expect(results.filter((isLeader) => isLeader).length).toBe(10); + }); + + it('should return null for getLeaderUUID when USE_REDIS is false', async () => { + // Create a few instances + instances = Array.from({ length: 3 }, () => new LeaderElection()); + + // Call isLeader on all instances to make them "leaders" + await Promise.all(instances.map((instance) => instance.isLeader())); + + // Verify getLeaderUUID returns null in non-Redis mode + expect(await LeaderElection.getLeaderUUID()).toBeNull(); + }); + + it('should allow resign() to be called without throwing errors', async () => { + // Create multiple instances + instances = Array.from({ length: 5 }, () => new LeaderElection()); + + // Make them all leaders + await Promise.all(instances.map((instance) => instance.isLeader())); + + // Call resign on all instances - should not throw + await expect( + Promise.all(instances.map((instance) => instance.resign())), + ).resolves.not.toThrow(); + + // Verify they're still leaders after resigning (since there's no shared state) + const results = await Promise.all(instances.map((instance) => instance.isLeader())); + expect(results.every((isLeader) => isLeader)).toBe(true); + }); +}); diff --git a/packages/api/src/cluster/config.ts b/packages/api/src/cluster/config.ts new file mode 100644 index 0000000000..d4a99b3217 --- /dev/null +++ b/packages/api/src/cluster/config.ts @@ -0,0 +1,14 @@ +import { math } from '~/utils'; + +const clusterConfig = { + /** Duration in seconds that the leader lease is valid before it expires */ + LEADER_LEASE_DURATION: math(process.env.LEADER_LEASE_DURATION, 25), + /** Interval in seconds at which the leader renews its lease */ + LEADER_RENEW_INTERVAL: math(process.env.LEADER_RENEW_INTERVAL, 10), + /** Maximum number of retry attempts when renewing the lease fails */ + LEADER_RENEW_ATTEMPTS: math(process.env.LEADER_RENEW_ATTEMPTS, 3), + /** Delay in seconds between retry attempts when renewing the lease */ + LEADER_RENEW_RETRY_DELAY: math(process.env.LEADER_RENEW_RETRY_DELAY, 0.5), +}; + +export { clusterConfig }; diff --git a/packages/api/src/cluster/index.ts b/packages/api/src/cluster/index.ts new file mode 100644 index 0000000000..71925a87ef --- /dev/null +++ b/packages/api/src/cluster/index.ts @@ -0,0 +1 @@ +export { isLeader } from './LeaderElection';