From 4b598808be84cdca6019ae0d7f45717f0671824e Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 18 Dec 2025 10:41:22 -0500 Subject: [PATCH] feat: Add USE_REDIS_STREAMS configuration for stream job storage - Introduced USE_REDIS_STREAMS to control Redis usage for resumable stream job storage, defaulting to true if USE_REDIS is enabled but not explicitly set. - Updated cacheConfig to include USE_REDIS_STREAMS and modified createStreamServices to utilize this new configuration. - Enhanced unit tests to validate the behavior of USE_REDIS_STREAMS under various environment settings, ensuring correct defaults and overrides. --- .../src/cache/__tests__/cacheConfig.spec.ts | 48 +++++++++++++++++++ packages/api/src/cache/cacheConfig.ts | 9 ++++ .../api/src/stream/createStreamServices.ts | 11 +++-- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/packages/api/src/cache/__tests__/cacheConfig.spec.ts b/packages/api/src/cache/__tests__/cacheConfig.spec.ts index 24f12f1d57..e24f52fee0 100644 --- a/packages/api/src/cache/__tests__/cacheConfig.spec.ts +++ b/packages/api/src/cache/__tests__/cacheConfig.spec.ts @@ -10,6 +10,7 @@ describe('cacheConfig', () => { delete process.env.REDIS_KEY_PREFIX_VAR; delete process.env.REDIS_KEY_PREFIX; delete process.env.USE_REDIS; + delete process.env.USE_REDIS_STREAMS; delete process.env.USE_REDIS_CLUSTER; delete process.env.REDIS_PING_INTERVAL; delete process.env.FORCED_IN_MEMORY_CACHE_NAMESPACES; @@ -130,6 +131,53 @@ describe('cacheConfig', () => { }); }); + describe('USE_REDIS_STREAMS configuration', () => { + test('should default to USE_REDIS value when USE_REDIS_STREAMS is not set', async () => { + process.env.USE_REDIS = 'true'; + process.env.REDIS_URI = 'redis://localhost:6379'; + + const { cacheConfig } = await import('../cacheConfig'); + expect(cacheConfig.USE_REDIS).toBe(true); + expect(cacheConfig.USE_REDIS_STREAMS).toBe(true); + }); + + test('should default to false when both USE_REDIS and USE_REDIS_STREAMS are not set', async () => { + const { cacheConfig } = await import('../cacheConfig'); + expect(cacheConfig.USE_REDIS).toBe(false); + expect(cacheConfig.USE_REDIS_STREAMS).toBe(false); + }); + + test('should be false when explicitly set to false even if USE_REDIS is true', async () => { + process.env.USE_REDIS = 'true'; + process.env.USE_REDIS_STREAMS = 'false'; + process.env.REDIS_URI = 'redis://localhost:6379'; + + const { cacheConfig } = await import('../cacheConfig'); + expect(cacheConfig.USE_REDIS).toBe(true); + expect(cacheConfig.USE_REDIS_STREAMS).toBe(false); + }); + + test('should be true when explicitly set to true', async () => { + process.env.USE_REDIS = 'true'; + process.env.USE_REDIS_STREAMS = 'true'; + process.env.REDIS_URI = 'redis://localhost:6379'; + + const { cacheConfig } = await import('../cacheConfig'); + expect(cacheConfig.USE_REDIS_STREAMS).toBe(true); + }); + + test('should allow streams without general Redis (explicit override)', async () => { + // Edge case: someone might want streams with Redis but not general caching + // This would require REDIS_URI but not USE_REDIS + process.env.USE_REDIS_STREAMS = 'true'; + process.env.REDIS_URI = 'redis://localhost:6379'; + + const { cacheConfig } = await import('../cacheConfig'); + expect(cacheConfig.USE_REDIS).toBe(false); + expect(cacheConfig.USE_REDIS_STREAMS).toBe(true); + }); + }); + describe('REDIS_CA file reading', () => { test('should be null when REDIS_CA is not set', async () => { const { cacheConfig } = await import('../cacheConfig'); diff --git a/packages/api/src/cache/cacheConfig.ts b/packages/api/src/cache/cacheConfig.ts index db4cc21921..32ea2cddd1 100644 --- a/packages/api/src/cache/cacheConfig.ts +++ b/packages/api/src/cache/cacheConfig.ts @@ -17,6 +17,14 @@ if (USE_REDIS && !process.env.REDIS_URI) { throw new Error('USE_REDIS is enabled but REDIS_URI is not set.'); } +// USE_REDIS_STREAMS controls whether Redis is used for resumable stream job storage. +// Defaults to true if USE_REDIS is enabled but USE_REDIS_STREAMS is not explicitly set. +// Set to 'false' to use in-memory storage for streams while keeping Redis for other caches. +const USE_REDIS_STREAMS = + process.env.USE_REDIS_STREAMS !== undefined + ? isEnabled(process.env.USE_REDIS_STREAMS) + : USE_REDIS; + // Comma-separated list of cache namespaces that should be forced to use in-memory storage // even when Redis is enabled. This allows selective performance optimization for specific caches. const FORCED_IN_MEMORY_CACHE_NAMESPACES = process.env.FORCED_IN_MEMORY_CACHE_NAMESPACES @@ -60,6 +68,7 @@ const getRedisCA = (): string | null => { const cacheConfig = { FORCED_IN_MEMORY_CACHE_NAMESPACES, USE_REDIS, + USE_REDIS_STREAMS, REDIS_URI: process.env.REDIS_URI, REDIS_USERNAME: process.env.REDIS_USERNAME, REDIS_PASSWORD: process.env.REDIS_PASSWORD, diff --git a/packages/api/src/stream/createStreamServices.ts b/packages/api/src/stream/createStreamServices.ts index 6c8090c187..ebf3055f8d 100644 --- a/packages/api/src/stream/createStreamServices.ts +++ b/packages/api/src/stream/createStreamServices.ts @@ -49,14 +49,17 @@ export interface StreamServices { /** * Create stream services (job store + event transport). * - * Automatically detects Redis from cacheConfig.USE_REDIS and uses + * Automatically detects Redis from cacheConfig.USE_REDIS_STREAMS and uses * the existing ioredisClient. Falls back to in-memory if Redis * is not configured or not available. * + * USE_REDIS_STREAMS defaults to USE_REDIS if not explicitly set, + * allowing users to disable Redis for streams while keeping it for other caches. + * * @example Auto-detect (uses cacheConfig) * ```ts * const services = createStreamServices(); - * // Uses Redis if USE_REDIS=true, otherwise in-memory + * // Uses Redis if USE_REDIS_STREAMS=true (defaults to USE_REDIS), otherwise in-memory * ``` * * @example Force in-memory @@ -65,8 +68,8 @@ export interface StreamServices { * ``` */ export function createStreamServices(config: StreamServicesConfig = {}): StreamServices { - // Use provided config or fall back to cache config - const useRedis = config.useRedis ?? cacheConfig.USE_REDIS; + // Use provided config or fall back to cache config (USE_REDIS_STREAMS for stream-specific override) + const useRedis = config.useRedis ?? cacheConfig.USE_REDIS_STREAMS; const redisClient = config.redisClient ?? ioredisClient; const { redisSubscriber, inMemoryOptions } = config;