feat: Add USE_REDIS_STREAMS configuration for stream job storage

- Introduced USE_REDIS_STREAMS to control Redis usage for resumable stream job storage, defaulting to true if USE_REDIS is enabled but not explicitly set.
- Updated cacheConfig to include USE_REDIS_STREAMS and modified createStreamServices to utilize this new configuration.
- Enhanced unit tests to validate the behavior of USE_REDIS_STREAMS under various environment settings, ensuring correct defaults and overrides.
This commit is contained in:
Danny Avila 2025-12-18 10:41:22 -05:00
parent cf963ca345
commit 4b598808be
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
3 changed files with 64 additions and 4 deletions

View file

@ -10,6 +10,7 @@ describe('cacheConfig', () => {
delete process.env.REDIS_KEY_PREFIX_VAR;
delete process.env.REDIS_KEY_PREFIX;
delete process.env.USE_REDIS;
delete process.env.USE_REDIS_STREAMS;
delete process.env.USE_REDIS_CLUSTER;
delete process.env.REDIS_PING_INTERVAL;
delete process.env.FORCED_IN_MEMORY_CACHE_NAMESPACES;
@ -130,6 +131,53 @@ describe('cacheConfig', () => {
});
});
describe('USE_REDIS_STREAMS configuration', () => {
test('should default to USE_REDIS value when USE_REDIS_STREAMS is not set', async () => {
process.env.USE_REDIS = 'true';
process.env.REDIS_URI = 'redis://localhost:6379';
const { cacheConfig } = await import('../cacheConfig');
expect(cacheConfig.USE_REDIS).toBe(true);
expect(cacheConfig.USE_REDIS_STREAMS).toBe(true);
});
test('should default to false when both USE_REDIS and USE_REDIS_STREAMS are not set', async () => {
const { cacheConfig } = await import('../cacheConfig');
expect(cacheConfig.USE_REDIS).toBe(false);
expect(cacheConfig.USE_REDIS_STREAMS).toBe(false);
});
test('should be false when explicitly set to false even if USE_REDIS is true', async () => {
process.env.USE_REDIS = 'true';
process.env.USE_REDIS_STREAMS = 'false';
process.env.REDIS_URI = 'redis://localhost:6379';
const { cacheConfig } = await import('../cacheConfig');
expect(cacheConfig.USE_REDIS).toBe(true);
expect(cacheConfig.USE_REDIS_STREAMS).toBe(false);
});
test('should be true when explicitly set to true', async () => {
process.env.USE_REDIS = 'true';
process.env.USE_REDIS_STREAMS = 'true';
process.env.REDIS_URI = 'redis://localhost:6379';
const { cacheConfig } = await import('../cacheConfig');
expect(cacheConfig.USE_REDIS_STREAMS).toBe(true);
});
test('should allow streams without general Redis (explicit override)', async () => {
// Edge case: someone might want streams with Redis but not general caching
// This would require REDIS_URI but not USE_REDIS
process.env.USE_REDIS_STREAMS = 'true';
process.env.REDIS_URI = 'redis://localhost:6379';
const { cacheConfig } = await import('../cacheConfig');
expect(cacheConfig.USE_REDIS).toBe(false);
expect(cacheConfig.USE_REDIS_STREAMS).toBe(true);
});
});
describe('REDIS_CA file reading', () => {
test('should be null when REDIS_CA is not set', async () => {
const { cacheConfig } = await import('../cacheConfig');

View file

@ -17,6 +17,14 @@ if (USE_REDIS && !process.env.REDIS_URI) {
throw new Error('USE_REDIS is enabled but REDIS_URI is not set.');
}
// USE_REDIS_STREAMS controls whether Redis is used for resumable stream job storage.
// Defaults to true if USE_REDIS is enabled but USE_REDIS_STREAMS is not explicitly set.
// Set to 'false' to use in-memory storage for streams while keeping Redis for other caches.
const USE_REDIS_STREAMS =
process.env.USE_REDIS_STREAMS !== undefined
? isEnabled(process.env.USE_REDIS_STREAMS)
: USE_REDIS;
// Comma-separated list of cache namespaces that should be forced to use in-memory storage
// even when Redis is enabled. This allows selective performance optimization for specific caches.
const FORCED_IN_MEMORY_CACHE_NAMESPACES = process.env.FORCED_IN_MEMORY_CACHE_NAMESPACES
@ -60,6 +68,7 @@ const getRedisCA = (): string | null => {
const cacheConfig = {
FORCED_IN_MEMORY_CACHE_NAMESPACES,
USE_REDIS,
USE_REDIS_STREAMS,
REDIS_URI: process.env.REDIS_URI,
REDIS_USERNAME: process.env.REDIS_USERNAME,
REDIS_PASSWORD: process.env.REDIS_PASSWORD,

View file

@ -49,14 +49,17 @@ export interface StreamServices {
/**
* Create stream services (job store + event transport).
*
* Automatically detects Redis from cacheConfig.USE_REDIS and uses
* Automatically detects Redis from cacheConfig.USE_REDIS_STREAMS and uses
* the existing ioredisClient. Falls back to in-memory if Redis
* is not configured or not available.
*
* USE_REDIS_STREAMS defaults to USE_REDIS if not explicitly set,
* allowing users to disable Redis for streams while keeping it for other caches.
*
* @example Auto-detect (uses cacheConfig)
* ```ts
* const services = createStreamServices();
* // Uses Redis if USE_REDIS=true, otherwise in-memory
* // Uses Redis if USE_REDIS_STREAMS=true (defaults to USE_REDIS), otherwise in-memory
* ```
*
* @example Force in-memory
@ -65,8 +68,8 @@ export interface StreamServices {
* ```
*/
export function createStreamServices(config: StreamServicesConfig = {}): StreamServices {
// Use provided config or fall back to cache config
const useRedis = config.useRedis ?? cacheConfig.USE_REDIS;
// Use provided config or fall back to cache config (USE_REDIS_STREAMS for stream-specific override)
const useRedis = config.useRedis ?? cacheConfig.USE_REDIS_STREAMS;
const redisClient = config.redisClient ?? ioredisClient;
const { redisSubscriber, inMemoryOptions } = config;