diff --git a/packages/api/jest.config.mjs b/packages/api/jest.config.mjs index 10fa4554e4..5506d6e483 100644 --- a/packages/api/jest.config.mjs +++ b/packages/api/jest.config.mjs @@ -10,6 +10,17 @@ export default { ], coverageReporters: ['text', 'cobertura'], testResultsProcessor: 'jest-junit', + transform: { + '\\.[jt]sx?$': [ + 'babel-jest', + { + presets: [ + ['@babel/preset-env', { targets: { node: 'current' } }], + '@babel/preset-typescript', + ], + }, + ], + }, moduleNameMapper: { '^@src/(.*)$': '/src/$1', '~/(.*)': '/src/$1', diff --git a/packages/api/src/cache/cacheFactory.ts b/packages/api/src/cache/cacheFactory.ts index 9b59afe554..2d7817c2ad 100644 --- a/packages/api/src/cache/cacheFactory.ts +++ b/packages/api/src/cache/cacheFactory.ts @@ -120,7 +120,9 @@ export const limiterCache = (prefix: string): RedisStore | undefined => { if (!cacheConfig.USE_REDIS) { return undefined; } - // TODO: The prefix is not actually applied. Also needs to account for global prefix. + // Note: The `prefix` is applied by RedisStore internally to its key operations. + // The global REDIS_KEY_PREFIX is applied by ioredisClient's keyPrefix setting. + // Combined key format: `{REDIS_KEY_PREFIX}::{prefix}{identifier}` prefix = prefix.endsWith(':') ? prefix : `${prefix}:`; try { diff --git a/packages/api/src/cache/redisClients.ts b/packages/api/src/cache/redisClients.ts index 79489336c4..fca4365f7f 100644 --- a/packages/api/src/cache/redisClients.ts +++ b/packages/api/src/cache/redisClients.ts @@ -29,7 +29,9 @@ if (cacheConfig.USE_REDIS) { ); return null; } - const delay = Math.min(times * 50, cacheConfig.REDIS_RETRY_MAX_DELAY); + const base = Math.min(Math.pow(2, times) * 50, cacheConfig.REDIS_RETRY_MAX_DELAY); + const jitter = Math.floor(Math.random() * Math.min(base, 1000)); + const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY); logger.info(`ioredis reconnecting... attempt ${times}, delay ${delay}ms`); return delay; }, @@ -71,7 +73,9 @@ if (cacheConfig.USE_REDIS) { ); return null; } - const delay = Math.min(times * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); + const base = Math.min(Math.pow(2, times) * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); + const jitter = Math.floor(Math.random() * Math.min(base, 1000)); + const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY); logger.info(`ioredis cluster reconnecting... attempt ${times}, delay ${delay}ms`); return delay; }, @@ -149,7 +153,9 @@ if (cacheConfig.USE_REDIS) { ); return new Error('Max reconnection attempts reached'); } - const delay = Math.min(retries * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); + const base = Math.min(Math.pow(2, retries) * 100, cacheConfig.REDIS_RETRY_MAX_DELAY); + const jitter = Math.floor(Math.random() * Math.min(base, 1000)); + const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY); logger.info(`@keyv/redis reconnecting... attempt ${retries}, delay ${delay}ms`); return delay; }, diff --git a/packages/api/src/middleware/__tests__/concurrency.cache_integration.spec.ts b/packages/api/src/middleware/__tests__/concurrency.cache_integration.spec.ts new file mode 100644 index 0000000000..4c29fdad55 --- /dev/null +++ b/packages/api/src/middleware/__tests__/concurrency.cache_integration.spec.ts @@ -0,0 +1,258 @@ +import type { Redis, Cluster } from 'ioredis'; + +/** + * Integration tests for concurrency middleware atomic Lua scripts. + * + * Tests that the Lua-based check-and-increment / decrement operations + * are truly atomic and eliminate the INCR+check+DECR race window. + * + * Run with: USE_REDIS=true npx jest --config packages/api/jest.config.js concurrency.cache_integration + */ +describe('Concurrency Middleware Integration Tests', () => { + let originalEnv: NodeJS.ProcessEnv; + let ioredisClient: Redis | Cluster | null = null; + let checkAndIncrementPendingRequest: ( + userId: string, + ) => Promise<{ allowed: boolean; pendingRequests: number; limit: number }>; + let decrementPendingRequest: (userId: string) => Promise; + const testPrefix = 'Concurrency-Integration-Test'; + + beforeAll(async () => { + originalEnv = { ...process.env }; + + process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; + process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false'; + process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; + process.env.REDIS_KEY_PREFIX = testPrefix; + process.env.REDIS_PING_INTERVAL = '0'; + process.env.REDIS_RETRY_MAX_ATTEMPTS = '5'; + process.env.LIMIT_CONCURRENT_MESSAGES = 'true'; + process.env.CONCURRENT_MESSAGE_MAX = '2'; + + jest.resetModules(); + + const { ioredisClient: client } = await import('../../cache/redisClients'); + ioredisClient = client; + + if (!ioredisClient) { + console.warn('Redis not available, skipping integration tests'); + return; + } + + // Import concurrency module after Redis client is available + const concurrency = await import('../concurrency'); + checkAndIncrementPendingRequest = concurrency.checkAndIncrementPendingRequest; + decrementPendingRequest = concurrency.decrementPendingRequest; + }); + + afterEach(async () => { + if (!ioredisClient) { + return; + } + + try { + const keys = await ioredisClient.keys(`${testPrefix}*`); + if (keys.length > 0) { + await Promise.all(keys.map((key) => ioredisClient!.del(key))); + } + } catch (error) { + console.warn('Error cleaning up test keys:', error); + } + }); + + afterAll(async () => { + if (ioredisClient) { + try { + await ioredisClient.quit(); + } catch { + try { + ioredisClient.disconnect(); + } catch { + // Ignore + } + } + } + process.env = originalEnv; + }); + + describe('Atomic Check and Increment', () => { + test('should allow requests within the concurrency limit', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-allow-${Date.now()}`; + + // First request - should be allowed (count = 1, limit = 2) + const result1 = await checkAndIncrementPendingRequest(userId); + expect(result1.allowed).toBe(true); + expect(result1.pendingRequests).toBe(1); + expect(result1.limit).toBe(2); + + // Second request - should be allowed (count = 2, limit = 2) + const result2 = await checkAndIncrementPendingRequest(userId); + expect(result2.allowed).toBe(true); + expect(result2.pendingRequests).toBe(2); + }); + + test('should reject requests over the concurrency limit', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-reject-${Date.now()}`; + + // Fill up to the limit + await checkAndIncrementPendingRequest(userId); + await checkAndIncrementPendingRequest(userId); + + // Third request - should be rejected (count would be 3, limit = 2) + const result = await checkAndIncrementPendingRequest(userId); + expect(result.allowed).toBe(false); + expect(result.pendingRequests).toBe(3); // Reports the count that was over-limit + }); + + test('should not leave stale counter after rejection (atomic rollback)', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-rollback-${Date.now()}`; + + // Fill up to the limit + await checkAndIncrementPendingRequest(userId); + await checkAndIncrementPendingRequest(userId); + + // Attempt over-limit (should be rejected and atomically rolled back) + const rejected = await checkAndIncrementPendingRequest(userId); + expect(rejected.allowed).toBe(false); + + // The key value should still be 2, not 3 — verify the Lua script decremented back + const key = `PENDING_REQ:${userId}`; + const rawValue = await ioredisClient.get(key); + expect(rawValue).toBe('2'); + }); + + test('should handle concurrent requests atomically (no over-admission)', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-concurrent-${Date.now()}`; + + // Fire 20 concurrent requests for the same user (limit = 2) + const results = await Promise.all( + Array.from({ length: 20 }, () => checkAndIncrementPendingRequest(userId)), + ); + + const allowed = results.filter((r) => r.allowed); + const rejected = results.filter((r) => !r.allowed); + + // Exactly 2 should be allowed (the concurrency limit) + expect(allowed.length).toBe(2); + expect(rejected.length).toBe(18); + + // The key value should be exactly 2 after all atomic operations + const key = `PENDING_REQ:${userId}`; + const rawValue = await ioredisClient.get(key); + expect(rawValue).toBe('2'); + + // Clean up + await decrementPendingRequest(userId); + await decrementPendingRequest(userId); + }); + }); + + describe('Atomic Decrement', () => { + test('should decrement pending requests', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-decrement-${Date.now()}`; + + await checkAndIncrementPendingRequest(userId); + await checkAndIncrementPendingRequest(userId); + + // Decrement once + await decrementPendingRequest(userId); + + const key = `PENDING_REQ:${userId}`; + const rawValue = await ioredisClient.get(key); + expect(rawValue).toBe('1'); + }); + + test('should clean up key when count reaches zero', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-cleanup-${Date.now()}`; + + await checkAndIncrementPendingRequest(userId); + await decrementPendingRequest(userId); + + // Key should be deleted (not left as "0") + const key = `PENDING_REQ:${userId}`; + const exists = await ioredisClient.exists(key); + expect(exists).toBe(0); + }); + + test('should clean up key on double-decrement (negative protection)', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-double-decr-${Date.now()}`; + + await checkAndIncrementPendingRequest(userId); + await decrementPendingRequest(userId); + await decrementPendingRequest(userId); // Double-decrement + + // Key should be deleted, not negative + const key = `PENDING_REQ:${userId}`; + const exists = await ioredisClient.exists(key); + expect(exists).toBe(0); + }); + + test('should allow new requests after decrement frees a slot', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-free-slot-${Date.now()}`; + + // Fill to limit + await checkAndIncrementPendingRequest(userId); + await checkAndIncrementPendingRequest(userId); + + // Verify at limit + const atLimit = await checkAndIncrementPendingRequest(userId); + expect(atLimit.allowed).toBe(false); + + // Free a slot + await decrementPendingRequest(userId); + + // Should now be allowed again + const allowed = await checkAndIncrementPendingRequest(userId); + expect(allowed.allowed).toBe(true); + expect(allowed.pendingRequests).toBe(2); + }); + }); + + describe('TTL Behavior', () => { + test('should set TTL on the concurrency key', async () => { + if (!ioredisClient) { + return; + } + + const userId = `user-ttl-${Date.now()}`; + await checkAndIncrementPendingRequest(userId); + + const key = `PENDING_REQ:${userId}`; + const ttl = await ioredisClient.ttl(key); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(60); + }); + }); +}); diff --git a/packages/api/src/middleware/concurrency.ts b/packages/api/src/middleware/concurrency.ts index 92ac8b7d46..22302e79d0 100644 --- a/packages/api/src/middleware/concurrency.ts +++ b/packages/api/src/middleware/concurrency.ts @@ -9,6 +9,40 @@ const LIMIT_CONCURRENT_MESSAGES = process.env.LIMIT_CONCURRENT_MESSAGES; const CONCURRENT_MESSAGE_MAX = math(process.env.CONCURRENT_MESSAGE_MAX, 2); const CONCURRENT_VIOLATION_SCORE = math(process.env.CONCURRENT_VIOLATION_SCORE, 1); +/** + * Lua script for atomic check-and-increment. + * Increments the key, sets TTL, and if over limit decrements back. + * Returns positive count if allowed, negative count if rejected. + * Single round-trip, fully atomic — eliminates the INCR/check/DECR race window. + */ +const CHECK_AND_INCREMENT_SCRIPT = ` +local key = KEYS[1] +local limit = tonumber(ARGV[1]) +local ttl = tonumber(ARGV[2]) +local current = redis.call('INCR', key) +redis.call('EXPIRE', key, ttl) +if current > limit then + redis.call('DECR', key) + return -current +end +return current +`; + +/** + * Lua script for atomic decrement-and-cleanup. + * Decrements the key and deletes it if the count reaches zero or below. + * Eliminates the DECR-then-DEL race window. + */ +const DECREMENT_SCRIPT = ` +local key = KEYS[1] +local current = redis.call('DECR', key) +if current <= 0 then + redis.call('DEL', key) + return 0 +end +return current +`; + /** Lazily initialized cache for pending requests (used only for in-memory fallback) */ let pendingReqCache: ReturnType | null = null; @@ -80,36 +114,28 @@ export async function checkAndIncrementPendingRequest( return { allowed: true, pendingRequests: 0, limit }; } - // Use atomic Redis INCR when available to prevent race conditions + // Use atomic Lua script when Redis is available to prevent race conditions. + // A single EVAL round-trip atomically increments, checks, and decrements if over-limit. if (USE_REDIS && ioredisClient) { const key = buildKey(userId); try { - // Pipeline ensures INCR and EXPIRE execute atomically in one round-trip - // This prevents edge cases where crash between operations leaves key without TTL - const pipeline = ioredisClient.pipeline(); - pipeline.incr(key); - pipeline.expire(key, 60); - const results = await pipeline.exec(); + const result = (await ioredisClient.eval( + CHECK_AND_INCREMENT_SCRIPT, + 1, + key, + limit, + 60, + )) as number; - if (!results || results[0][0]) { - throw new Error('Pipeline execution failed'); + if (result < 0) { + // Negative return means over-limit (absolute value is the count before decrement) + const count = -result; + logger.debug(`[concurrency] User ${userId} exceeded concurrent limit: ${count}/${limit}`); + return { allowed: false, pendingRequests: count, limit }; } - const newCount = results[0][1] as number; - - if (newCount > limit) { - // Over limit - decrement back and reject - await ioredisClient.decr(key); - logger.debug( - `[concurrency] User ${userId} exceeded concurrent limit: ${newCount}/${limit}`, - ); - return { allowed: false, pendingRequests: newCount, limit }; - } - - logger.debug( - `[concurrency] User ${userId} incremented pending requests: ${newCount}/${limit}`, - ); - return { allowed: true, pendingRequests: newCount, limit }; + logger.debug(`[concurrency] User ${userId} incremented pending requests: ${result}/${limit}`); + return { allowed: true, pendingRequests: result, limit }; } catch (error) { logger.error('[concurrency] Redis atomic increment failed:', error); // On Redis error, allow the request to proceed (fail-open) @@ -164,18 +190,12 @@ export async function decrementPendingRequest(userId: string): Promise { return; } - // Use atomic Redis DECR when available + // Use atomic Lua script to decrement and clean up zero/negative keys in one round-trip if (USE_REDIS && ioredisClient) { const key = buildKey(userId); try { - const newCount = await ioredisClient.decr(key); - if (newCount < 0) { - // Counter went negative - reset to 0 and delete - await ioredisClient.del(key); - logger.debug(`[concurrency] User ${userId} pending requests cleared (was negative)`); - } else if (newCount === 0) { - // Clean up zero-value keys - await ioredisClient.del(key); + const newCount = (await ioredisClient.eval(DECREMENT_SCRIPT, 1, key)) as number; + if (newCount === 0) { logger.debug(`[concurrency] User ${userId} pending requests cleared`); } else { logger.debug(`[concurrency] User ${userId} decremented pending requests: ${newCount}`); diff --git a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts index d1f9467cd0..b5e53dfbff 100644 --- a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts @@ -19,8 +19,11 @@ describe('RedisEventTransport Integration Tests', () => { originalEnv = { ...process.env }; process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; + process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false'; process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; process.env.REDIS_KEY_PREFIX = testPrefix; + process.env.REDIS_PING_INTERVAL = '0'; + process.env.REDIS_RETRY_MAX_ATTEMPTS = '5'; jest.resetModules(); @@ -890,4 +893,121 @@ describe('RedisEventTransport Integration Tests', () => { subscriber.disconnect(); }); }); + + describe('Publish Error Propagation', () => { + test('should swallow emitChunk publish errors (callers fire-and-forget)', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-chunk-${Date.now()}`; + + // emitChunk swallows errors because callers often fire-and-forget (no await). + // Throwing would cause unhandled promise rejections. + await expect(transport.emitChunk(streamId, { data: 'test' })).resolves.toBeUndefined(); + + transport.destroy(); + }); + + test('should throw when emitDone publish fails', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-done-${Date.now()}`; + + await expect(transport.emitDone(streamId, { finished: true })).rejects.toThrow( + 'Redis connection lost', + ); + + transport.destroy(); + }); + + test('should throw when emitError publish fails', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = `error-prop-error-${Date.now()}`; + + await expect(transport.emitError(streamId, 'some error')).rejects.toThrow( + 'Redis connection lost', + ); + + transport.destroy(); + }); + + test('should still deliver events successfully when publish succeeds', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `error-prop-success-${Date.now()}`; + const receivedChunks: unknown[] = []; + let doneEvent: unknown = null; + + transport.subscribe(streamId, { + onChunk: (event) => receivedChunks.push(event), + onDone: (event) => { + doneEvent = event; + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // These should NOT throw + await transport.emitChunk(streamId, { text: 'hello' }); + await transport.emitDone(streamId, { finished: true }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(receivedChunks.length).toBe(1); + expect(doneEvent).toEqual({ finished: true }); + + transport.destroy(); + subscriber.disconnect(); + }); + }); }); diff --git a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts index d00321a77d..a64ba11f26 100644 --- a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts @@ -24,8 +24,11 @@ describe('RedisJobStore Integration Tests', () => { // Set up test environment process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; + process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false'; process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; process.env.REDIS_KEY_PREFIX = testPrefix; + process.env.REDIS_PING_INTERVAL = '0'; + process.env.REDIS_RETRY_MAX_ATTEMPTS = '5'; jest.resetModules(); @@ -1033,4 +1036,196 @@ describe('RedisJobStore Integration Tests', () => { await instance2.destroy(); }); }); + + describe('Batched Cleanup', () => { + test('should clean up many stale jobs in parallel batches', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + // Very short TTL so jobs are immediately stale + const store = new RedisJobStore(ioredisClient, { runningTtl: 1 }); + await store.initialize(); + + const jobCount = 75; // More than one batch of 50 + const veryOldTimestamp = Date.now() - 10000; // 10 seconds ago + + // Create many stale jobs directly in Redis + for (let i = 0; i < jobCount; i++) { + const streamId = `batch-cleanup-${Date.now()}-${i}`; + const jobKey = `stream:{${streamId}}:job`; + await ioredisClient.hmset(jobKey, { + streamId, + userId: 'batch-user', + status: 'running', + createdAt: veryOldTimestamp.toString(), + syncSent: '0', + }); + await ioredisClient.sadd('stream:running', streamId); + } + + // Verify jobs are in the running set + const runningBefore = await ioredisClient.scard('stream:running'); + expect(runningBefore).toBeGreaterThanOrEqual(jobCount); + + // Run cleanup - should process in batches of 50 + const cleaned = await store.cleanup(); + expect(cleaned).toBeGreaterThanOrEqual(jobCount); + + await store.destroy(); + }); + + test('should not clean up valid running jobs during batch cleanup', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { runningTtl: 1200 }); + await store.initialize(); + + // Create a mix of valid and stale jobs + const validStreamId = `valid-job-${Date.now()}`; + await store.createJob(validStreamId, 'user-1', validStreamId); + + const staleStreamId = `stale-job-${Date.now()}`; + const jobKey = `stream:{${staleStreamId}}:job`; + await ioredisClient.hmset(jobKey, { + streamId: staleStreamId, + userId: 'user-1', + status: 'running', + createdAt: (Date.now() - 2000000).toString(), // Very old + syncSent: '0', + }); + await ioredisClient.sadd('stream:running', staleStreamId); + + const cleaned = await store.cleanup(); + expect(cleaned).toBeGreaterThanOrEqual(1); + + // Valid job should still exist + const validJob = await store.getJob(validStreamId); + expect(validJob).not.toBeNull(); + expect(validJob?.status).toBe('running'); + + await store.destroy(); + }); + }); + + describe('appendChunk TTL Refresh', () => { + test('should set TTL on the chunk stream', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { runningTtl: 120 }); + await store.initialize(); + + const streamId = `append-ttl-${Date.now()}`; + await store.createJob(streamId, 'user-1', streamId); + + await store.appendChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', type: 'text', text: 'first' }, + }); + + const chunkKey = `stream:{${streamId}}:chunks`; + const ttl = await ioredisClient.ttl(chunkKey); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(120); + + await store.destroy(); + }); + + test('should refresh TTL on subsequent chunks (not just first)', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient, { runningTtl: 120 }); + await store.initialize(); + + const streamId = `append-refresh-${Date.now()}`; + await store.createJob(streamId, 'user-1', streamId); + + // Append first chunk + await store.appendChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', type: 'text', text: 'first' }, + }); + + const chunkKey = `stream:{${streamId}}:chunks`; + const ttl1 = await ioredisClient.ttl(chunkKey); + expect(ttl1).toBeGreaterThan(0); + + // Manually reduce TTL to simulate time passing + await ioredisClient.expire(chunkKey, 30); + const reducedTtl = await ioredisClient.ttl(chunkKey); + expect(reducedTtl).toBeLessThanOrEqual(30); + + // Append another chunk - TTL should be refreshed back to running TTL + await store.appendChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', type: 'text', text: 'second' }, + }); + + const ttl2 = await ioredisClient.ttl(chunkKey); + // Should be refreshed to ~120, not still ~30 + expect(ttl2).toBeGreaterThan(30); + expect(ttl2).toBeLessThanOrEqual(120); + + await store.destroy(); + }); + + test('should store chunks correctly via pipeline', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const streamId = `append-pipeline-${Date.now()}`; + await store.createJob(streamId, 'user-1', streamId); + + const chunks = [ + { + event: 'on_run_step', + data: { + id: 'step-1', + runId: 'run-1', + index: 0, + stepDetails: { type: 'message_creation' }, + }, + }, + { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: 'Hello ' } } }, + }, + { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: 'world!' } } }, + }, + ]; + + for (const chunk of chunks) { + await store.appendChunk(streamId, chunk); + } + + // Verify all chunks were stored + const chunkKey = `stream:{${streamId}}:chunks`; + const len = await ioredisClient.xlen(chunkKey); + expect(len).toBe(3); + + // Verify content can be reconstructed + const content = await store.getContentParts(streamId); + expect(content).not.toBeNull(); + expect(content!.content.length).toBeGreaterThan(0); + + await store.destroy(); + }); + }); }); diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 2362afe647..230727cca2 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -461,6 +461,7 @@ export class RedisEventTransport implements IEventTransport { await this.publisher.publish(channel, JSON.stringify(message)); } catch (err) { logger.error(`[RedisEventTransport] Failed to publish done:`, err); + throw err; } } @@ -477,6 +478,7 @@ export class RedisEventTransport implements IEventTransport { await this.publisher.publish(channel, JSON.stringify(message)); } catch (err) { logger.error(`[RedisEventTransport] Failed to publish error:`, err); + throw err; } } diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index a0d26d087f..727fe066eb 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -302,32 +302,46 @@ export class RedisJobStore implements IJobStore { } } - for (const streamId of streamIds) { - const job = await this.getJob(streamId); + // Process in batches of 50 to avoid sequential per-job round-trips + const BATCH_SIZE = 50; + for (let i = 0; i < streamIds.length; i += BATCH_SIZE) { + const batch = streamIds.slice(i, i + BATCH_SIZE); + const results = await Promise.allSettled( + batch.map(async (streamId) => { + const job = await this.getJob(streamId); - // Job no longer exists (TTL expired) - remove from set - if (!job) { - await this.redis.srem(KEYS.runningJobs, streamId); - this.localGraphCache.delete(streamId); - this.localCollectedUsageCache.delete(streamId); - cleaned++; - continue; - } + // Job no longer exists (TTL expired) - remove from set + if (!job) { + await this.redis.srem(KEYS.runningJobs, streamId); + this.localGraphCache.delete(streamId); + this.localCollectedUsageCache.delete(streamId); + return 1; + } - // Job completed but still in running set (shouldn't happen, but handle it) - if (job.status !== 'running') { - await this.redis.srem(KEYS.runningJobs, streamId); - this.localGraphCache.delete(streamId); - this.localCollectedUsageCache.delete(streamId); - cleaned++; - continue; - } + // Job completed but still in running set (shouldn't happen, but handle it) + if (job.status !== 'running') { + await this.redis.srem(KEYS.runningJobs, streamId); + this.localGraphCache.delete(streamId); + this.localCollectedUsageCache.delete(streamId); + return 1; + } - // Stale running job (failsafe - running for > configured TTL) - if (now - job.createdAt > this.ttl.running * 1000) { - logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); - await this.deleteJob(streamId); - cleaned++; + // Stale running job (failsafe - running for > configured TTL) + if (now - job.createdAt > this.ttl.running * 1000) { + logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); + await this.deleteJob(streamId); + return 1; + } + + return 0; + }), + ); + for (const result of results) { + if (result.status === 'fulfilled') { + cleaned += result.value; + } else { + logger.warn(`[RedisJobStore] Cleanup failed for a job:`, result.reason); + } } } @@ -592,16 +606,14 @@ export class RedisJobStore implements IJobStore { */ async appendChunk(streamId: string, event: unknown): Promise { const key = KEYS.chunks(streamId); - const added = await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); - - // Set TTL on first chunk (when stream is created) - // Subsequent chunks inherit the stream's TTL - if (added) { - const len = await this.redis.xlen(key); - if (len === 1) { - await this.redis.expire(key, this.ttl.running); - } - } + // Pipeline XADD + EXPIRE in a single round-trip. + // EXPIRE is O(1) and idempotent — refreshing TTL on every chunk is better than + // only setting it once, since the original approach could let the TTL expire + // during long-running streams. + const pipeline = this.redis.pipeline(); + pipeline.xadd(key, '*', 'event', JSON.stringify(event)); + pipeline.expire(key, this.ttl.running); + await pipeline.exec(); } /**