From e646a3615e5b4dbbc0ccdf5c50cb147d58843e39 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Tue, 10 Feb 2026 13:16:29 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=8A=20fix:=20Prevent=20Truncations=20W?= =?UTF-8?q?hen=20Redis=20Resumable=20Streams=20Are=20Enabled=20(#11710)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: prevent truncated responses when Redis resumable streams are enabled Race condition in RedisEventTransport.subscribe() caused early events (seq 0+) to be lost. The Redis SUBSCRIBE command was fired as fire-and-forget, but GenerationJobManager immediately set hasSubscriber=true, disabling the earlyEventBuffer. Events published during the gap between subscribe() returning and the Redis subscription actually taking effect were neither buffered nor received — they were silently dropped by Pub/Sub. This manifested as "timeout waiting for seq 0, force-flushing N messages" warnings followed by truncated or missing response text in the UI. The fix: - IEventTransport.subscribe() now returns an optional `ready` promise that resolves once the transport can actually receive messages - RedisEventTransport returns the Redis SUBSCRIBE acknowledgment as the `ready` promise instead of firing it as fire-and-forget - GenerationJobManager.subscribe() awaits `ready` before setting hasSubscriber=true, keeping the earlyEventBuffer active during the subscription window so no events are lost - GenerationJobManager.emitChunk() early-returns after buffering when no subscriber is connected, avoiding wasteful Redis PUBLISHes that nobody would receive Adds 5 regression tests covering the race condition for both in-memory and Redis transports, verifying that events emitted before subscribe are buffered and replayed, that the ready promise contract is correct for both transport implementations, and that no events are lost across the subscribe boundary. * refactor: Update import paths in GenerationJobManager integration tests - Refactored import statements in the GenerationJobManager integration test file to use absolute paths instead of relative paths, improving code readability and maintainability. - Removed redundant imports and ensured consistent usage of the updated import structure across the test cases. * chore: Remove redundant await from GenerationJobManager initialization in tests - Updated multiple test cases to call GenerationJobManager.initialize() without awaiting, improving test performance and clarity. - Ensured consistent initialization across various scenarios in the CollectedUsage and AbortJob test suites. * refactor: Enhance GenerationJobManager integration tests and RedisEventTransport cleanup - Updated GenerationJobManager integration tests to utilize dynamic Redis clients and removed unnecessary awaits from initialization calls, improving test performance. - Refactored RedisEventTransport's destroy method to safely disconnect the subscriber, enhancing resource management and preventing potential errors during cleanup. * feat: Enhance GenerationJobManager and RedisEventTransport for improved event handling - Added a resetSequence method to IEventTransport and implemented it in RedisEventTransport to manage publish sequence counters effectively. - Updated GenerationJobManager to utilize the new resetSequence method, ensuring proper event handling during stream operations. - Introduced integration tests for GenerationJobManager to validate cross-replica event publishing and subscriber readiness in Redis, enhancing test coverage and reliability. * test: Add integration tests for GenerationJobManager sequence reset and error recovery with Redis - Introduced new tests to validate the behavior of GenerationJobManager during sequence resets, ensuring no stale events are received after a reset. - Added tests to confirm that the sequence is not reset when a second subscriber joins mid-stream, maintaining event integrity. - Implemented a test for resubscription after a Redis subscribe failure, verifying that events can still be received post-error. - Enhanced overall test coverage for Redis-related functionalities in GenerationJobManager. * fix: Update GenerationJobManager and RedisEventTransport for improved event synchronization - Replaced the resetSequence method with syncReorderBuffer in GenerationJobManager to enhance cross-replica event handling without resetting the publisher sequence. - Added a new syncReorderBuffer method in RedisEventTransport to advance the subscriber reorder buffer safely, ensuring no data loss during subscriber transitions. - Introduced a new integration test to validate that local subscribers joining do not cause data loss for cross-replica subscribers, enhancing the reliability of event delivery. - Updated existing tests to reflect changes in event handling logic, improving overall test coverage and robustness. * fix: Clear flushTimeout in RedisEventTransport to prevent potential memory leaks - Added logic to clear the flushTimeout in the reorderBuffer when resetting the sequence counters, ensuring proper resource management and preventing memory leaks during state transitions in RedisEventTransport. --- .../api/src/stream/GenerationJobManager.ts | 15 +- ...ationJobManager.stream_integration.spec.ts | 831 +++++++++++++++--- .../stream/__tests__/collectedUsage.spec.ts | 20 +- .../implementations/InMemoryEventTransport.ts | 2 +- .../implementations/RedisEventTransport.ts | 92 +- .../api/src/stream/interfaces/IJobStore.ts | 10 +- 6 files changed, 782 insertions(+), 188 deletions(-) diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index fefb0dd207..815133d616 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -745,7 +745,6 @@ class GenerationJobManagerClass { const subscription = this.eventTransport.subscribe(streamId, { onChunk: (event) => { const e = event as t.ServerSentEvent; - // Filter out internal events if (!(e as Record)._internal) { onChunk(e); } @@ -754,14 +753,15 @@ class GenerationJobManagerClass { onError, }); - // Check if this is the first subscriber + if (subscription.ready) { + await subscription.ready; + } + const isFirst = this.eventTransport.isFirstSubscriber(streamId); - // First subscriber: replay buffered events and mark as connected if (!runtime.hasSubscriber) { runtime.hasSubscriber = true; - // Replay any events that were emitted before subscriber connected if (runtime.earlyEventBuffer.length > 0) { logger.debug( `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, @@ -771,6 +771,8 @@ class GenerationJobManagerClass { } runtime.earlyEventBuffer = []; } + + this.eventTransport.syncReorderBuffer?.(streamId); } if (isFirst) { @@ -823,12 +825,13 @@ class GenerationJobManagerClass { } } - // Buffer early events if no subscriber yet (replay when first subscriber connects) if (!runtime.hasSubscriber) { runtime.earlyEventBuffer.push(event); + if (!this._isRedis) { + return; + } } - // Await the transport emit - critical for Redis mode to maintain event order await this.eventTransport.emitChunk(streamId, event); } diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 8723f3f000..59fe32e4e5 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -1,4 +1,17 @@ import type { Redis, Cluster } from 'ioredis'; +import type { ServerSentEvent } from '~/types/events'; +import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; +import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport'; +import { InMemoryJobStore } from '~/stream/implementations/InMemoryJobStore'; +import { GenerationJobManagerClass } from '~/stream/GenerationJobManager'; +import { RedisJobStore } from '~/stream/implementations/RedisJobStore'; +import { createStreamServices } from '~/stream/createStreamServices'; +import { GenerationJobManager } from '~/stream/GenerationJobManager'; +import { + ioredisClient as staticRedisClient, + keyvRedisClient as staticKeyvClient, + keyvRedisClientReady, +} from '~/cache/redisClients'; /** * Integration tests for GenerationJobManager. @@ -11,20 +24,23 @@ import type { Redis, Cluster } from 'ioredis'; describe('GenerationJobManager Integration Tests', () => { let originalEnv: NodeJS.ProcessEnv; let ioredisClient: Redis | Cluster | null = null; + let dynamicKeyvClient: unknown = null; + let dynamicKeyvReady: Promise | null = null; const testPrefix = 'JobManager-Integration-Test'; beforeAll(async () => { originalEnv = { ...process.env }; - // Set up test environment process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; process.env.REDIS_KEY_PREFIX = testPrefix; jest.resetModules(); - const { ioredisClient: client } = await import('../../cache/redisClients'); - ioredisClient = client; + const redisModule = await import('~/cache/redisClients'); + ioredisClient = redisModule.ioredisClient; + dynamicKeyvClient = redisModule.keyvRedisClient; + dynamicKeyvReady = redisModule.keyvRedisClientReady; }); afterEach(async () => { @@ -45,28 +61,29 @@ describe('GenerationJobManager Integration Tests', () => { }); afterAll(async () => { - if (ioredisClient) { - try { - // Use quit() to gracefully close - waits for pending commands - await ioredisClient.quit(); - } catch { - // Fall back to disconnect if quit fails - try { - ioredisClient.disconnect(); - } catch { - // Ignore - } + for (const ready of [keyvRedisClientReady, dynamicKeyvReady]) { + if (ready) { + await ready.catch(() => {}); } } + + const clients = [ioredisClient, staticRedisClient, staticKeyvClient, dynamicKeyvClient]; + for (const client of clients) { + if (!client) { + continue; + } + try { + await (client as { disconnect: () => void | Promise }).disconnect(); + } catch { + /* ignore */ + } + } + process.env = originalEnv; }); describe('In-Memory Mode', () => { test('should create and manage jobs', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - // Configure with in-memory // cleanupOnComplete: false so we can verify completed status GenerationJobManager.configure({ @@ -76,7 +93,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `inmem-job-${Date.now()}`; const userId = 'test-user-1'; @@ -108,17 +125,13 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should handle event streaming', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `inmem-events-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -165,9 +178,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - // Create Redis services const services = createStreamServices({ useRedis: true, @@ -177,7 +187,7 @@ describe('GenerationJobManager Integration Tests', () => { expect(services.isRedis).toBe(true); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-job-${Date.now()}`; const userId = 'test-user-redis'; @@ -204,16 +214,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-chunks-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -262,16 +269,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -314,10 +318,7 @@ describe('GenerationJobManager Integration Tests', () => { const runTestWithMode = async (isRedis: boolean) => { jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - if (isRedis && ioredisClient) { - const { createStreamServices } = await import('../createStreamServices'); GenerationJobManager.configure({ ...createStreamServices({ useRedis: true, @@ -326,10 +327,6 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, // Keep job for verification }); } else { - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import( - '../implementations/InMemoryEventTransport' - ); GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -338,7 +335,7 @@ describe('GenerationJobManager Integration Tests', () => { }); } - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `consistency-${isRedis ? 'redis' : 'inmem'}-${Date.now()}`; @@ -395,8 +392,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === REPLICA A: Creates the job === // Simulate Replica A creating the job directly in Redis // (In real scenario, this happens via GenerationJobManager.createJob on Replica A) @@ -412,8 +407,6 @@ describe('GenerationJobManager Integration Tests', () => { // === REPLICA B: Receives the stream request === // Fresh GenerationJobManager that does NOT have this job in its local runtimeState jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -421,7 +414,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // This is what the stream endpoint does: // const job = await GenerationJobManager.getJob(streamId); @@ -464,10 +457,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - // Simulate two instances - one creates job, other tries to get it - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // Instance 1: Create the job directly in Redis (simulating another replica) const jobStore = new RedisJobStore(ioredisClient); await jobStore.initialize(); @@ -480,7 +469,6 @@ describe('GenerationJobManager Integration Tests', () => { // Instance 2: Fresh GenerationJobManager that doesn't have this job in memory jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -488,7 +476,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // This should work even though the job was created by "another instance" // The manager should lazily create runtime state from Redis data @@ -517,16 +505,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `sync-sent-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -559,9 +544,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -571,7 +553,7 @@ describe('GenerationJobManager Integration Tests', () => { ...services, cleanupOnComplete: false, // Keep job for verification }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `final-event-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -604,16 +586,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `abort-signal-${Date.now()}`; const job = await GenerationJobManager.createJob(streamId, 'user-1'); @@ -649,9 +628,6 @@ describe('GenerationJobManager Integration Tests', () => { // This test validates that jobs created on Replica A and lazily-initialized // on Replica B can still receive and handle abort signals. - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === Replica A: Create job directly in Redis === const replicaAJobStore = new RedisJobStore(ioredisClient); await replicaAJobStore.initialize(); @@ -661,7 +637,6 @@ describe('GenerationJobManager Integration Tests', () => { // === Replica B: Fresh manager that lazily initializes the job === jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -669,7 +644,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // Get job triggers lazy initialization of runtime state const job = await GenerationJobManager.getJob(streamId); @@ -710,19 +685,14 @@ describe('GenerationJobManager Integration Tests', () => { // 2. Replica B receives abort request and emits abort signal // 3. Replica A receives signal and aborts its AbortController - const { createStreamServices } = await import('../createStreamServices'); - const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - // Create the job on "Replica A" - const { GenerationJobManager } = await import('../GenerationJobManager'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `cross-abort-${Date.now()}`; const job = await GenerationJobManager.createJob(streamId, 'user-1'); @@ -764,9 +734,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // Create job directly in Redis with syncSent: true const jobStore = new RedisJobStore(ioredisClient); await jobStore.initialize(); @@ -777,7 +744,6 @@ describe('GenerationJobManager Integration Tests', () => { // Fresh manager that doesn't have this job locally jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -785,7 +751,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // wasSyncSent should check Redis even without local runtime const wasSent = await GenerationJobManager.wasSyncSent(streamId); @@ -813,8 +779,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -822,7 +786,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `order-rapid-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -865,8 +829,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -874,7 +836,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `tool-args-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -926,8 +888,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -935,7 +895,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `step-order-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -991,8 +951,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -1000,7 +958,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId1 = `concurrent-1-${Date.now()}`; const streamId2 = `concurrent-2-${Date.now()}`; @@ -1057,6 +1015,202 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Race Condition: Events Before Subscriber Ready', () => { + /** + * These tests verify the fix for the race condition where early events + * (like the 'created' event at seq 0) are lost because the Redis SUBSCRIBE + * command hasn't completed when events are published. + * + * Symptom: "[RedisEventTransport] Stream : timeout waiting for seq 0" + * followed by truncated responses in the UI. + * + * Root cause: RedisEventTransport.subscribe() fired Redis SUBSCRIBE as + * fire-and-forget. GenerationJobManager set hasSubscriber=true immediately, + * disabling the earlyEventBuffer before Redis was actually listening. + * + * Fix: subscribe() now returns a `ready` promise that resolves when the + * Redis subscription is confirmed. earlyEventBuffer stays active until then. + */ + + test('should buffer and replay events emitted before subscribe (in-memory)', async () => { + const manager = new GenerationJobManagerClass(); + manager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + }); + + manager.initialize(); + + const streamId = `early-buf-inmem-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'First chunk' } } }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(receivedEvents.length).toBe(2); + expect((receivedEvents[0] as Record).created).toBe(true); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('should buffer and replay events emitted before subscribe (Redis)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + manager.configure(services); + manager.initialize(); + + const streamId = `early-buf-redis-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'First' } } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: ' chunk' } } }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(3); + expect((receivedEvents[0] as Record).created).toBe(true); + expect( + ((receivedEvents[1] as Record).data as Record).delta, + ).toBeDefined(); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('should not lose events when emitting before and after subscribe (Redis)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + manager.configure(services); + manager.initialize(); + + const streamId = `no-loss-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', type: 'message_creation', index: 0 }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 10; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedEvents.length).toBe(12); + expect((receivedEvents[0] as Record).created).toBe(true); + expect((receivedEvents[1] as Record).event).toBe('on_run_step'); + for (let i = 0; i < 10; i++) { + expect((receivedEvents[i + 2] as Record).event).toBe('on_message_delta'); + } + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('RedisEventTransport.subscribe() should return a ready promise', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate(); + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `ready-promise-${Date.now()}`; + const result = transport.subscribe(streamId, { + onChunk: () => {}, + }); + + expect(result.ready).toBeDefined(); + expect(result.ready).toBeInstanceOf(Promise); + + await result.ready; + + result.unsubscribe(); + transport.destroy(); + (subscriber as { disconnect: () => void }).disconnect(); + }); + + test('InMemoryEventTransport.subscribe() should not have a ready promise', () => { + const transport = new InMemoryEventTransport(); + const streamId = `no-ready-${Date.now()}`; + const result = transport.subscribe(streamId, { + onChunk: () => {}, + }); + + expect(result.ready).toBeUndefined(); + + result.unsubscribe(); + transport.destroy(); + }); + }); + describe('Error Preservation for Late Subscribers', () => { /** * These tests verify the fix for the race condition where errors @@ -1067,10 +1221,6 @@ describe('GenerationJobManager Integration Tests', () => { */ test('should store error in emitError for late-connecting subscribers', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1078,7 +1228,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-store-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1099,10 +1249,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should NOT delete job immediately when completeJob is called with error', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1110,7 +1256,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, // Default behavior }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-no-delete-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1133,10 +1279,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should send stored error to late-connecting subscriber', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1144,7 +1286,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-late-sub-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1182,10 +1324,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should prioritize error status over finalEvent in subscribe', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1193,7 +1331,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-priority-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1237,9 +1375,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === Replica A: Creates job and emits error === const replicaAJobStore = new RedisJobStore(ioredisClient); await replicaAJobStore.initialize(); @@ -1256,7 +1391,6 @@ describe('GenerationJobManager Integration Tests', () => { // === Replica B: Fresh manager receives client connection === jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -1267,7 +1401,7 @@ describe('GenerationJobManager Integration Tests', () => { ...services, cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // Client connects to Replica B (job created on Replica A) let receivedError: string | undefined; @@ -1293,10 +1427,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('error jobs should be cleaned up by periodic cleanup after TTL', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - // Use a very short TTL for testing const jobStore = new InMemoryJobStore({ ttlAfterComplete: 100 }); @@ -1307,7 +1437,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-cleanup-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1333,36 +1463,457 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('createStreamServices Auto-Detection', () => { - test('should auto-detect Redis when USE_REDIS is true', async () => { + describe('Cross-Replica Live Streaming (Redis)', () => { + test('should publish events to Redis even when no local subscriber exists', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } - // Force USE_REDIS to true - process.env.USE_REDIS = 'true'; - jest.resetModules(); + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices(); + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); - // Should detect Redis - expect(services.isRedis).toBe(true); + const streamId = `cross-live-${Date.now()}`; + await replicaA.createJob(streamId, 'user-1'); + + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 5; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `token${i} ` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedOnB.length).toBe(5); + for (let i = 0; i < 5; i++) { + expect((receivedOnB[i] as Record).event).toBe('on_message_delta'); + } + + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); }); - test('should fall back to in-memory when USE_REDIS is false', async () => { - process.env.USE_REDIS = 'false'; - jest.resetModules(); + test('should not cause data loss on cross-replica subscribers when local subscriber joins', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices(); + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); + + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); + + const streamId = `cross-seq-safe-${Date.now()}`; + + await replicaA.createJob(streamId, 'user-1'); + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `pre-local-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + expect(receivedOnB.length).toBe(3); + + const receivedOnA: unknown[] = []; + const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedOnA.length).toBe(3); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `post-local-${i}` } }, index: i + 3 }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedOnB.length).toBe(6); + expect(receivedOnA.length).toBe(6); + + for (let i = 0; i < 3; i++) { + const data = (receivedOnB[i] as Record).data as Record; + const delta = data.delta as Record; + const content = delta.content as Record; + expect(content.text).toBe(`pre-local-${i}`); + } + for (let i = 0; i < 3; i++) { + const data = (receivedOnB[i + 3] as Record).data as Record< + string, + unknown + >; + const delta = data.delta as Record; + const content = delta.content as Record; + expect(content.text).toBe(`post-local-${i}`); + } + + subA?.unsubscribe(); + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); + }); + + test('should deliver buffered events locally AND publish live events cross-replica', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); + + const streamId = `cross-buf-live-${Date.now()}`; + await replicaA.createJob(streamId, 'user-1'); + + await replicaA.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + + const receivedOnA: unknown[] = []; + const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedOnA.length).toBe(1); + expect((receivedOnA[0] as Record).created).toBe(true); + + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); + + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + }); + } + + /** B joined after A published seq 0, so B's reorder buffer force-flushes after REORDER_TIMEOUT_MS (500ms) */ + await new Promise((resolve) => setTimeout(resolve, 700)); + + expect(receivedOnA.length).toBe(4); + expect(receivedOnB.length).toBe(3); + + subA?.unsubscribe(); + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); + }); + }); + + describe('Concurrent Subscriber Readiness (Redis)', () => { + test('should return ready promise to all concurrent subscribers for same stream', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = ( + ioredisClient as unknown as { duplicate: () => typeof ioredisClient } + ).duplicate()!; + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `concurrent-sub-${Date.now()}`; + + const sub1 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + const sub2 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + + expect(sub1.ready).toBeDefined(); + expect(sub2.ready).toBeDefined(); + + await Promise.all([sub1.ready, sub2.ready]); + + sub1.unsubscribe(); + sub2.unsubscribe(); + transport.destroy(); + subscriber.disconnect(); + }); + }); + + describe('Sequence Reset Safety (Redis)', () => { + test('should not receive stale pre-subscribe events via Redis after sequence reset', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + manager.configure(services); + manager.initialize(); + + const streamId = `seq-stale-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'pre-sub-0' } }, index: 0 }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'pre-sub-1' } }, index: 1 }, + }); + + const receivedEvents: unknown[] = []; + const sub = await manager.subscribe(streamId, (event: unknown) => receivedEvents.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(2); + expect( + ((receivedEvents[0] as Record).data as Record).delta, + ).toBeDefined(); + + for (let i = 0; i < 5; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `post-sub-${i}` } }, index: i + 2 }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedEvents.length).toBe(7); + + const texts = receivedEvents.map( + (e) => + ( + ((e as Record).data as Record).delta as Record< + string, + unknown + > + ).content as Record, + ); + expect((texts[0] as Record).text).toBe('pre-sub-0'); + expect((texts[1] as Record).text).toBe('pre-sub-1'); + for (let i = 0; i < 5; i++) { + expect((texts[i + 2] as Record).text).toBe(`post-sub-${i}`); + } + + sub?.unsubscribe(); + await manager.destroy(); + }); + + test('should not reset sequence when second subscriber joins mid-stream', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + manager.configure({ ...services, cleanupOnComplete: false }); + manager.initialize(); + + const streamId = `seq-2nd-sub-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const eventsA: unknown[] = []; + const subA = await manager.subscribe(streamId, (event: unknown) => eventsA.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + for (let i = 0; i < 3; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `chunk-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(eventsA.length).toBe(3); + + const eventsB: unknown[] = []; + const subB = await manager.subscribe(streamId, (event: unknown) => eventsB.push(event)); + + for (let i = 3; i < 6; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `chunk-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(eventsA.length).toBe(6); + expect(eventsB.length).toBe(3); + + for (let i = 0; i < 6; i++) { + const text = ( + ( + ((eventsA[i] as Record).data as Record) + .delta as Record + ).content as Record + ).text; + expect(text).toBe(`chunk-${i}`); + } + + subA?.unsubscribe(); + subB?.unsubscribe(); + await manager.destroy(); + }); + }); + + describe('Subscribe Error Recovery (Redis)', () => { + test('should allow resubscription after Redis subscribe failure', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = ( + ioredisClient as unknown as { duplicate: () => typeof ioredisClient } + ).duplicate()!; + + const realSubscribe = subscriber.subscribe.bind(subscriber); + let callCount = 0; + subscriber.subscribe = ((...args: Parameters) => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error('Simulated Redis SUBSCRIBE failure')); + } + return realSubscribe(...args); + }) as typeof subscriber.subscribe; + + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `err-retry-${Date.now()}`; + + const sub1 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + + await sub1.ready; + + const receivedEvents: unknown[] = []; + sub1.unsubscribe(); + + const sub2 = transport.subscribe(streamId, { + onChunk: (event: unknown) => receivedEvents.push(event), + onDone: () => {}, + }); + + expect(sub2.ready).toBeDefined(); + await sub2.ready; + + await transport.emitChunk(streamId, { event: 'test', data: { value: 'hello' } }); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(1); + + sub2.unsubscribe(); + transport.destroy(); + subscriber.disconnect(); + }); + }); + + describe('createStreamServices Auto-Detection', () => { + test('should use Redis when useRedis is true and client is available', () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + expect(services.isRedis).toBe(true); + services.eventTransport.destroy(); + }); + + test('should fall back to in-memory when useRedis is false', () => { + const services = createStreamServices({ useRedis: false }); expect(services.isRedis).toBe(false); }); test('should allow forcing in-memory via config override', async () => { - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: false }); expect(services.isRedis).toBe(false); diff --git a/packages/api/src/stream/__tests__/collectedUsage.spec.ts b/packages/api/src/stream/__tests__/collectedUsage.spec.ts index 3e534b537a..d9a9ab95fe 100644 --- a/packages/api/src/stream/__tests__/collectedUsage.spec.ts +++ b/packages/api/src/stream/__tests__/collectedUsage.spec.ts @@ -146,7 +146,7 @@ describe('CollectedUsage - GenerationJobManager', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `manager-test-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -179,7 +179,7 @@ describe('CollectedUsage - GenerationJobManager', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `no-usage-test-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -202,7 +202,7 @@ describe('CollectedUsage - GenerationJobManager', () => { isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const collectedUsage: UsageMetadata[] = [ { input_tokens: 100, output_tokens: 50, model: 'gpt-4' }, @@ -235,7 +235,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `text-extract-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -267,7 +267,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `empty-text-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -291,7 +291,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `full-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -328,7 +328,7 @@ describe('AbortJob - Text and CollectedUsage', () => { isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const abortResult = await GenerationJobManager.abortJob('non-existent-job'); @@ -365,7 +365,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `parallel-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -419,7 +419,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `cache-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -459,7 +459,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `sequential-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts index 39b3d6029d..d0a815ce45 100644 --- a/packages/api/src/stream/implementations/InMemoryEventTransport.ts +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -32,7 +32,7 @@ export class InMemoryEventTransport implements IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void } { + ): { unsubscribe: () => void; ready?: Promise } { const state = this.getOrCreateStream(streamId); const chunkHandler = (event: unknown) => handlers.onChunk(event); diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 78f545c18e..2362afe647 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -92,8 +92,8 @@ export class RedisEventTransport implements IEventTransport { private subscriber: Redis | Cluster; /** Track subscribers per stream */ private streams = new Map(); - /** Track which channels we're subscribed to */ - private subscribedChannels = new Set(); + /** Track channel subscription state: resolved promise = active, pending = in-flight */ + private channelSubscriptions = new Map>(); /** Counter for generating unique subscriber IDs */ private subscriberIdCounter = 0; /** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */ @@ -122,9 +122,32 @@ export class RedisEventTransport implements IEventTransport { return current; } - /** Reset sequence counter for a stream */ - private resetSequence(streamId: string): void { + /** Reset publish sequence counter and subscriber reorder state for a stream (full cleanup only) */ + resetSequence(streamId: string): void { this.sequenceCounters.delete(streamId); + const state = this.streams.get(streamId); + if (state) { + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.nextSeq = 0; + state.reorderBuffer.pending.clear(); + } + } + + /** Advance subscriber reorder buffer to current publisher sequence without resetting publisher (cross-replica safe) */ + syncReorderBuffer(streamId: string): void { + const currentSeq = this.sequenceCounters.get(streamId) ?? 0; + const state = this.streams.get(streamId); + if (state) { + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.nextSeq = currentSeq; + state.reorderBuffer.pending.clear(); + } } /** @@ -331,7 +354,7 @@ export class RedisEventTransport implements IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void } { + ): { unsubscribe: () => void; ready?: Promise } { const channel = CHANNELS.events(streamId); const subscriberId = `sub_${++this.subscriberIdCounter}`; @@ -354,16 +377,23 @@ export class RedisEventTransport implements IEventTransport { streamState.count++; streamState.handlers.set(subscriberId, handlers); - // Subscribe to Redis channel if this is first subscriber - if (!this.subscribedChannels.has(channel)) { - this.subscribedChannels.add(channel); - this.subscriber.subscribe(channel).catch((err) => { - logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); - }); + let readyPromise = this.channelSubscriptions.get(channel); + + if (!readyPromise) { + readyPromise = this.subscriber + .subscribe(channel) + .then(() => { + logger.debug(`[RedisEventTransport] Subscription active for channel ${channel}`); + }) + .catch((err) => { + this.channelSubscriptions.delete(channel); + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); + }); + this.channelSubscriptions.set(channel, readyPromise); } - // Return unsubscribe function return { + ready: readyPromise, unsubscribe: () => { const state = this.streams.get(streamId); if (!state) { @@ -385,7 +415,7 @@ export class RedisEventTransport implements IEventTransport { this.subscriber.unsubscribe(channel).catch((err) => { logger.error(`[RedisEventTransport] Failed to unsubscribe from ${channel}:`, err); }); - this.subscribedChannels.delete(channel); + this.channelSubscriptions.delete(channel); // Call all-subscribers-left callbacks for (const callback of state.allSubscribersLeftCallbacks) { @@ -532,12 +562,15 @@ export class RedisEventTransport implements IEventTransport { state.abortCallbacks.push(callback); - // Subscribe to Redis channel if not already subscribed - if (!this.subscribedChannels.has(channel)) { - this.subscribedChannels.add(channel); - this.subscriber.subscribe(channel).catch((err) => { - logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); - }); + if (!this.channelSubscriptions.has(channel)) { + const ready = this.subscriber + .subscribe(channel) + .then(() => {}) + .catch((err) => { + this.channelSubscriptions.delete(channel); + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); + }); + this.channelSubscriptions.set(channel, ready); } } @@ -571,12 +604,11 @@ export class RedisEventTransport implements IEventTransport { // Reset sequence counter for this stream this.resetSequence(streamId); - // Unsubscribe from Redis channel - if (this.subscribedChannels.has(channel)) { + if (this.channelSubscriptions.has(channel)) { this.subscriber.unsubscribe(channel).catch((err) => { logger.error(`[RedisEventTransport] Failed to cleanup ${channel}:`, err); }); - this.subscribedChannels.delete(channel); + this.channelSubscriptions.delete(channel); } this.streams.delete(streamId); @@ -595,18 +627,20 @@ export class RedisEventTransport implements IEventTransport { state.reorderBuffer.pending.clear(); } - // Unsubscribe from all channels - for (const channel of this.subscribedChannels) { - this.subscriber.unsubscribe(channel).catch(() => { - // Ignore errors during shutdown - }); + for (const channel of this.channelSubscriptions.keys()) { + this.subscriber.unsubscribe(channel).catch(() => {}); } - this.subscribedChannels.clear(); + this.channelSubscriptions.clear(); this.streams.clear(); this.sequenceCounters.clear(); - // Note: Don't close Redis connections - they may be shared + try { + this.subscriber.disconnect(); + } catch { + /* ignore */ + } + logger.info('[RedisEventTransport] Destroyed'); } } diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index d990283925..5486b941eb 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -286,7 +286,7 @@ export interface IJobStore { * Implementations can use EventEmitter, Redis Pub/Sub, etc. */ export interface IEventTransport { - /** Subscribe to events for a stream */ + /** Subscribe to events for a stream. `ready` resolves once the transport can receive messages. */ subscribe( streamId: string, handlers: { @@ -294,7 +294,7 @@ export interface IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void }; + ): { unsubscribe: () => void; ready?: Promise }; /** Publish a chunk event - returns Promise in Redis mode for ordered delivery */ emitChunk(streamId: string, event: unknown): void | Promise; @@ -329,6 +329,12 @@ export interface IEventTransport { /** Listen for all subscribers leaving */ onAllSubscribersLeft(streamId: string, callback: () => void): void; + /** Reset publish sequence counter for a stream (used during full stream cleanup) */ + resetSequence?(streamId: string): void; + + /** Advance subscriber reorder buffer to match publisher sequence (cross-replica safe: doesn't reset publisher counter) */ + syncReorderBuffer?(streamId: string): void; + /** Cleanup transport resources for a specific stream */ cleanup(streamId: string): void;