diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index fefb0dd207..815133d616 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -745,7 +745,6 @@ class GenerationJobManagerClass { const subscription = this.eventTransport.subscribe(streamId, { onChunk: (event) => { const e = event as t.ServerSentEvent; - // Filter out internal events if (!(e as Record)._internal) { onChunk(e); } @@ -754,14 +753,15 @@ class GenerationJobManagerClass { onError, }); - // Check if this is the first subscriber + if (subscription.ready) { + await subscription.ready; + } + const isFirst = this.eventTransport.isFirstSubscriber(streamId); - // First subscriber: replay buffered events and mark as connected if (!runtime.hasSubscriber) { runtime.hasSubscriber = true; - // Replay any events that were emitted before subscriber connected if (runtime.earlyEventBuffer.length > 0) { logger.debug( `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, @@ -771,6 +771,8 @@ class GenerationJobManagerClass { } runtime.earlyEventBuffer = []; } + + this.eventTransport.syncReorderBuffer?.(streamId); } if (isFirst) { @@ -823,12 +825,13 @@ class GenerationJobManagerClass { } } - // Buffer early events if no subscriber yet (replay when first subscriber connects) if (!runtime.hasSubscriber) { runtime.earlyEventBuffer.push(event); + if (!this._isRedis) { + return; + } } - // Await the transport emit - critical for Redis mode to maintain event order await this.eventTransport.emitChunk(streamId, event); } diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 8723f3f000..59fe32e4e5 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -1,4 +1,17 @@ import type { Redis, Cluster } from 'ioredis'; +import type { ServerSentEvent } from '~/types/events'; +import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; +import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport'; +import { InMemoryJobStore } from '~/stream/implementations/InMemoryJobStore'; +import { GenerationJobManagerClass } from '~/stream/GenerationJobManager'; +import { RedisJobStore } from '~/stream/implementations/RedisJobStore'; +import { createStreamServices } from '~/stream/createStreamServices'; +import { GenerationJobManager } from '~/stream/GenerationJobManager'; +import { + ioredisClient as staticRedisClient, + keyvRedisClient as staticKeyvClient, + keyvRedisClientReady, +} from '~/cache/redisClients'; /** * Integration tests for GenerationJobManager. @@ -11,20 +24,23 @@ import type { Redis, Cluster } from 'ioredis'; describe('GenerationJobManager Integration Tests', () => { let originalEnv: NodeJS.ProcessEnv; let ioredisClient: Redis | Cluster | null = null; + let dynamicKeyvClient: unknown = null; + let dynamicKeyvReady: Promise | null = null; const testPrefix = 'JobManager-Integration-Test'; beforeAll(async () => { originalEnv = { ...process.env }; - // Set up test environment process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; process.env.REDIS_KEY_PREFIX = testPrefix; jest.resetModules(); - const { ioredisClient: client } = await import('../../cache/redisClients'); - ioredisClient = client; + const redisModule = await import('~/cache/redisClients'); + ioredisClient = redisModule.ioredisClient; + dynamicKeyvClient = redisModule.keyvRedisClient; + dynamicKeyvReady = redisModule.keyvRedisClientReady; }); afterEach(async () => { @@ -45,28 +61,29 @@ describe('GenerationJobManager Integration Tests', () => { }); afterAll(async () => { - if (ioredisClient) { - try { - // Use quit() to gracefully close - waits for pending commands - await ioredisClient.quit(); - } catch { - // Fall back to disconnect if quit fails - try { - ioredisClient.disconnect(); - } catch { - // Ignore - } + for (const ready of [keyvRedisClientReady, dynamicKeyvReady]) { + if (ready) { + await ready.catch(() => {}); } } + + const clients = [ioredisClient, staticRedisClient, staticKeyvClient, dynamicKeyvClient]; + for (const client of clients) { + if (!client) { + continue; + } + try { + await (client as { disconnect: () => void | Promise }).disconnect(); + } catch { + /* ignore */ + } + } + process.env = originalEnv; }); describe('In-Memory Mode', () => { test('should create and manage jobs', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - // Configure with in-memory // cleanupOnComplete: false so we can verify completed status GenerationJobManager.configure({ @@ -76,7 +93,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `inmem-job-${Date.now()}`; const userId = 'test-user-1'; @@ -108,17 +125,13 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should handle event streaming', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `inmem-events-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -165,9 +178,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - // Create Redis services const services = createStreamServices({ useRedis: true, @@ -177,7 +187,7 @@ describe('GenerationJobManager Integration Tests', () => { expect(services.isRedis).toBe(true); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-job-${Date.now()}`; const userId = 'test-user-redis'; @@ -204,16 +214,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-chunks-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -262,16 +269,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `redis-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -314,10 +318,7 @@ describe('GenerationJobManager Integration Tests', () => { const runTestWithMode = async (isRedis: boolean) => { jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - if (isRedis && ioredisClient) { - const { createStreamServices } = await import('../createStreamServices'); GenerationJobManager.configure({ ...createStreamServices({ useRedis: true, @@ -326,10 +327,6 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, // Keep job for verification }); } else { - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import( - '../implementations/InMemoryEventTransport' - ); GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -338,7 +335,7 @@ describe('GenerationJobManager Integration Tests', () => { }); } - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `consistency-${isRedis ? 'redis' : 'inmem'}-${Date.now()}`; @@ -395,8 +392,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === REPLICA A: Creates the job === // Simulate Replica A creating the job directly in Redis // (In real scenario, this happens via GenerationJobManager.createJob on Replica A) @@ -412,8 +407,6 @@ describe('GenerationJobManager Integration Tests', () => { // === REPLICA B: Receives the stream request === // Fresh GenerationJobManager that does NOT have this job in its local runtimeState jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -421,7 +414,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // This is what the stream endpoint does: // const job = await GenerationJobManager.getJob(streamId); @@ -464,10 +457,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - // Simulate two instances - one creates job, other tries to get it - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // Instance 1: Create the job directly in Redis (simulating another replica) const jobStore = new RedisJobStore(ioredisClient); await jobStore.initialize(); @@ -480,7 +469,6 @@ describe('GenerationJobManager Integration Tests', () => { // Instance 2: Fresh GenerationJobManager that doesn't have this job in memory jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -488,7 +476,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // This should work even though the job was created by "another instance" // The manager should lazily create runtime state from Redis data @@ -517,16 +505,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `sync-sent-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -559,9 +544,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -571,7 +553,7 @@ describe('GenerationJobManager Integration Tests', () => { ...services, cleanupOnComplete: false, // Keep job for verification }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `final-event-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -604,16 +586,13 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `abort-signal-${Date.now()}`; const job = await GenerationJobManager.createJob(streamId, 'user-1'); @@ -649,9 +628,6 @@ describe('GenerationJobManager Integration Tests', () => { // This test validates that jobs created on Replica A and lazily-initialized // on Replica B can still receive and handle abort signals. - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === Replica A: Create job directly in Redis === const replicaAJobStore = new RedisJobStore(ioredisClient); await replicaAJobStore.initialize(); @@ -661,7 +637,6 @@ describe('GenerationJobManager Integration Tests', () => { // === Replica B: Fresh manager that lazily initializes the job === jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -669,7 +644,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // Get job triggers lazy initialization of runtime state const job = await GenerationJobManager.getJob(streamId); @@ -710,19 +685,14 @@ describe('GenerationJobManager Integration Tests', () => { // 2. Replica B receives abort request and emits abort signal // 3. Replica A receives signal and aborts its AbortController - const { createStreamServices } = await import('../createStreamServices'); - const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); - // Create the job on "Replica A" - const { GenerationJobManager } = await import('../GenerationJobManager'); - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `cross-abort-${Date.now()}`; const job = await GenerationJobManager.createJob(streamId, 'user-1'); @@ -764,9 +734,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // Create job directly in Redis with syncSent: true const jobStore = new RedisJobStore(ioredisClient); await jobStore.initialize(); @@ -777,7 +744,6 @@ describe('GenerationJobManager Integration Tests', () => { // Fresh manager that doesn't have this job locally jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -785,7 +751,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // wasSyncSent should check Redis even without local runtime const wasSent = await GenerationJobManager.wasSyncSent(streamId); @@ -813,8 +779,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -822,7 +786,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `order-rapid-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -865,8 +829,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -874,7 +836,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `tool-args-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -926,8 +888,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -935,7 +895,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `step-order-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -991,8 +951,6 @@ describe('GenerationJobManager Integration Tests', () => { } jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, @@ -1000,7 +958,7 @@ describe('GenerationJobManager Integration Tests', () => { }); GenerationJobManager.configure(services); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId1 = `concurrent-1-${Date.now()}`; const streamId2 = `concurrent-2-${Date.now()}`; @@ -1057,6 +1015,202 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Race Condition: Events Before Subscriber Ready', () => { + /** + * These tests verify the fix for the race condition where early events + * (like the 'created' event at seq 0) are lost because the Redis SUBSCRIBE + * command hasn't completed when events are published. + * + * Symptom: "[RedisEventTransport] Stream : timeout waiting for seq 0" + * followed by truncated responses in the UI. + * + * Root cause: RedisEventTransport.subscribe() fired Redis SUBSCRIBE as + * fire-and-forget. GenerationJobManager set hasSubscriber=true immediately, + * disabling the earlyEventBuffer before Redis was actually listening. + * + * Fix: subscribe() now returns a `ready` promise that resolves when the + * Redis subscription is confirmed. earlyEventBuffer stays active until then. + */ + + test('should buffer and replay events emitted before subscribe (in-memory)', async () => { + const manager = new GenerationJobManagerClass(); + manager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + }); + + manager.initialize(); + + const streamId = `early-buf-inmem-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'First chunk' } } }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(receivedEvents.length).toBe(2); + expect((receivedEvents[0] as Record).created).toBe(true); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('should buffer and replay events emitted before subscribe (Redis)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + manager.configure(services); + manager.initialize(); + + const streamId = `early-buf-redis-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'First' } } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: ' chunk' } } }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(3); + expect((receivedEvents[0] as Record).created).toBe(true); + expect( + ((receivedEvents[1] as Record).data as Record).delta, + ).toBeDefined(); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('should not lose events when emitting before and after subscribe (Redis)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + manager.configure(services); + manager.initialize(); + + const streamId = `no-loss-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', type: 'message_creation', index: 0 }, + }); + + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 10; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedEvents.length).toBe(12); + expect((receivedEvents[0] as Record).created).toBe(true); + expect((receivedEvents[1] as Record).event).toBe('on_run_step'); + for (let i = 0; i < 10; i++) { + expect((receivedEvents[i + 2] as Record).event).toBe('on_message_delta'); + } + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('RedisEventTransport.subscribe() should return a ready promise', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate(); + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `ready-promise-${Date.now()}`; + const result = transport.subscribe(streamId, { + onChunk: () => {}, + }); + + expect(result.ready).toBeDefined(); + expect(result.ready).toBeInstanceOf(Promise); + + await result.ready; + + result.unsubscribe(); + transport.destroy(); + (subscriber as { disconnect: () => void }).disconnect(); + }); + + test('InMemoryEventTransport.subscribe() should not have a ready promise', () => { + const transport = new InMemoryEventTransport(); + const streamId = `no-ready-${Date.now()}`; + const result = transport.subscribe(streamId, { + onChunk: () => {}, + }); + + expect(result.ready).toBeUndefined(); + + result.unsubscribe(); + transport.destroy(); + }); + }); + describe('Error Preservation for Late Subscribers', () => { /** * These tests verify the fix for the race condition where errors @@ -1067,10 +1221,6 @@ describe('GenerationJobManager Integration Tests', () => { */ test('should store error in emitError for late-connecting subscribers', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1078,7 +1228,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-store-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1099,10 +1249,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should NOT delete job immediately when completeJob is called with error', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1110,7 +1256,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, // Default behavior }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-no-delete-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1133,10 +1279,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should send stored error to late-connecting subscriber', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1144,7 +1286,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-late-sub-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1182,10 +1324,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should prioritize error status over finalEvent in subscribe', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), @@ -1193,7 +1331,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-priority-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1237,9 +1375,6 @@ describe('GenerationJobManager Integration Tests', () => { return; } - const { createStreamServices } = await import('../createStreamServices'); - const { RedisJobStore } = await import('../implementations/RedisJobStore'); - // === Replica A: Creates job and emits error === const replicaAJobStore = new RedisJobStore(ioredisClient); await replicaAJobStore.initialize(); @@ -1256,7 +1391,6 @@ describe('GenerationJobManager Integration Tests', () => { // === Replica B: Fresh manager receives client connection === jest.resetModules(); - const { GenerationJobManager } = await import('../GenerationJobManager'); const services = createStreamServices({ useRedis: true, @@ -1267,7 +1401,7 @@ describe('GenerationJobManager Integration Tests', () => { ...services, cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); // Client connects to Replica B (job created on Replica A) let receivedError: string | undefined; @@ -1293,10 +1427,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('error jobs should be cleaned up by periodic cleanup after TTL', async () => { - const { GenerationJobManager } = await import('../GenerationJobManager'); - const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); - const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); - // Use a very short TTL for testing const jobStore = new InMemoryJobStore({ ttlAfterComplete: 100 }); @@ -1307,7 +1437,7 @@ describe('GenerationJobManager Integration Tests', () => { cleanupOnComplete: true, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `error-cleanup-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -1333,36 +1463,457 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('createStreamServices Auto-Detection', () => { - test('should auto-detect Redis when USE_REDIS is true', async () => { + describe('Cross-Replica Live Streaming (Redis)', () => { + test('should publish events to Redis even when no local subscriber exists', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } - // Force USE_REDIS to true - process.env.USE_REDIS = 'true'; - jest.resetModules(); + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices(); + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); - // Should detect Redis - expect(services.isRedis).toBe(true); + const streamId = `cross-live-${Date.now()}`; + await replicaA.createJob(streamId, 'user-1'); + + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 5; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `token${i} ` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedOnB.length).toBe(5); + for (let i = 0; i < 5; i++) { + expect((receivedOnB[i] as Record).event).toBe('on_message_delta'); + } + + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); }); - test('should fall back to in-memory when USE_REDIS is false', async () => { - process.env.USE_REDIS = 'false'; - jest.resetModules(); + test('should not cause data loss on cross-replica subscribers when local subscriber joins', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } - const { createStreamServices } = await import('../createStreamServices'); - const services = createStreamServices(); + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); + + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); + + const streamId = `cross-seq-safe-${Date.now()}`; + + await replicaA.createJob(streamId, 'user-1'); + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `pre-local-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + expect(receivedOnB.length).toBe(3); + + const receivedOnA: unknown[] = []; + const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedOnA.length).toBe(3); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `post-local-${i}` } }, index: i + 3 }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedOnB.length).toBe(6); + expect(receivedOnA.length).toBe(6); + + for (let i = 0; i < 3; i++) { + const data = (receivedOnB[i] as Record).data as Record; + const delta = data.delta as Record; + const content = delta.content as Record; + expect(content.text).toBe(`pre-local-${i}`); + } + for (let i = 0; i < 3; i++) { + const data = (receivedOnB[i + 3] as Record).data as Record< + string, + unknown + >; + const delta = data.delta as Record; + const content = delta.content as Record; + expect(content.text).toBe(`post-local-${i}`); + } + + subA?.unsubscribe(); + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); + }); + + test('should deliver buffered events locally AND publish live events cross-replica', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const replicaA = new GenerationJobManagerClass(); + const servicesA = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaA.configure(servicesA); + replicaA.initialize(); + + const streamId = `cross-buf-live-${Date.now()}`; + await replicaA.createJob(streamId, 'user-1'); + + await replicaA.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + + const receivedOnA: unknown[] = []; + const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedOnA.length).toBe(1); + expect((receivedOnA[0] as Record).created).toBe(true); + + const replicaB = new GenerationJobManagerClass(); + const servicesB = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + replicaB.configure(servicesB); + replicaB.initialize(); + + const replicaBJobStore = new RedisJobStore(ioredisClient); + await replicaBJobStore.initialize(); + await replicaBJobStore.createJob(streamId, 'user-1'); + + const receivedOnB: unknown[] = []; + const subB = await replicaB.subscribe(streamId, (event: unknown) => receivedOnB.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 3; i++) { + await replicaA.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + }); + } + + /** B joined after A published seq 0, so B's reorder buffer force-flushes after REORDER_TIMEOUT_MS (500ms) */ + await new Promise((resolve) => setTimeout(resolve, 700)); + + expect(receivedOnA.length).toBe(4); + expect(receivedOnB.length).toBe(3); + + subA?.unsubscribe(); + subB?.unsubscribe(); + replicaBJobStore.destroy(); + await replicaA.destroy(); + await replicaB.destroy(); + }); + }); + + describe('Concurrent Subscriber Readiness (Redis)', () => { + test('should return ready promise to all concurrent subscribers for same stream', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = ( + ioredisClient as unknown as { duplicate: () => typeof ioredisClient } + ).duplicate()!; + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `concurrent-sub-${Date.now()}`; + + const sub1 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + const sub2 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + + expect(sub1.ready).toBeDefined(); + expect(sub2.ready).toBeDefined(); + + await Promise.all([sub1.ready, sub2.ready]); + + sub1.unsubscribe(); + sub2.unsubscribe(); + transport.destroy(); + subscriber.disconnect(); + }); + }); + + describe('Sequence Reset Safety (Redis)', () => { + test('should not receive stale pre-subscribe events via Redis after sequence reset', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + manager.configure(services); + manager.initialize(); + + const streamId = `seq-stale-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'pre-sub-0' } }, index: 0 }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'pre-sub-1' } }, index: 1 }, + }); + + const receivedEvents: unknown[] = []; + const sub = await manager.subscribe(streamId, (event: unknown) => receivedEvents.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(2); + expect( + ((receivedEvents[0] as Record).data as Record).delta, + ).toBeDefined(); + + for (let i = 0; i < 5; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `post-sub-${i}` } }, index: i + 2 }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedEvents.length).toBe(7); + + const texts = receivedEvents.map( + (e) => + ( + ((e as Record).data as Record).delta as Record< + string, + unknown + > + ).content as Record, + ); + expect((texts[0] as Record).text).toBe('pre-sub-0'); + expect((texts[1] as Record).text).toBe('pre-sub-1'); + for (let i = 0; i < 5; i++) { + expect((texts[i + 2] as Record).text).toBe(`post-sub-${i}`); + } + + sub?.unsubscribe(); + await manager.destroy(); + }); + + test('should not reset sequence when second subscriber joins mid-stream', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + manager.configure({ ...services, cleanupOnComplete: false }); + manager.initialize(); + + const streamId = `seq-2nd-sub-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const eventsA: unknown[] = []; + const subA = await manager.subscribe(streamId, (event: unknown) => eventsA.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + for (let i = 0; i < 3; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `chunk-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(eventsA.length).toBe(3); + + const eventsB: unknown[] = []; + const subB = await manager.subscribe(streamId, (event: unknown) => eventsB.push(event)); + + for (let i = 3; i < 6; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `chunk-${i}` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(eventsA.length).toBe(6); + expect(eventsB.length).toBe(3); + + for (let i = 0; i < 6; i++) { + const text = ( + ( + ((eventsA[i] as Record).data as Record) + .delta as Record + ).content as Record + ).text; + expect(text).toBe(`chunk-${i}`); + } + + subA?.unsubscribe(); + subB?.unsubscribe(); + await manager.destroy(); + }); + }); + + describe('Subscribe Error Recovery (Redis)', () => { + test('should allow resubscription after Redis subscribe failure', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const subscriber = ( + ioredisClient as unknown as { duplicate: () => typeof ioredisClient } + ).duplicate()!; + + const realSubscribe = subscriber.subscribe.bind(subscriber); + let callCount = 0; + subscriber.subscribe = ((...args: Parameters) => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error('Simulated Redis SUBSCRIBE failure')); + } + return realSubscribe(...args); + }) as typeof subscriber.subscribe; + + const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); + + const streamId = `err-retry-${Date.now()}`; + + const sub1 = transport.subscribe(streamId, { + onChunk: () => {}, + onDone: () => {}, + }); + + await sub1.ready; + + const receivedEvents: unknown[] = []; + sub1.unsubscribe(); + + const sub2 = transport.subscribe(streamId, { + onChunk: (event: unknown) => receivedEvents.push(event), + onDone: () => {}, + }); + + expect(sub2.ready).toBeDefined(); + await sub2.ready; + + await transport.emitChunk(streamId, { event: 'test', data: { value: 'hello' } }); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(receivedEvents.length).toBe(1); + + sub2.unsubscribe(); + transport.destroy(); + subscriber.disconnect(); + }); + }); + + describe('createStreamServices Auto-Detection', () => { + test('should use Redis when useRedis is true and client is available', () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + expect(services.isRedis).toBe(true); + services.eventTransport.destroy(); + }); + + test('should fall back to in-memory when useRedis is false', () => { + const services = createStreamServices({ useRedis: false }); expect(services.isRedis).toBe(false); }); test('should allow forcing in-memory via config override', async () => { - const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: false }); expect(services.isRedis).toBe(false); diff --git a/packages/api/src/stream/__tests__/collectedUsage.spec.ts b/packages/api/src/stream/__tests__/collectedUsage.spec.ts index 3e534b537a..d9a9ab95fe 100644 --- a/packages/api/src/stream/__tests__/collectedUsage.spec.ts +++ b/packages/api/src/stream/__tests__/collectedUsage.spec.ts @@ -146,7 +146,7 @@ describe('CollectedUsage - GenerationJobManager', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `manager-test-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -179,7 +179,7 @@ describe('CollectedUsage - GenerationJobManager', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `no-usage-test-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -202,7 +202,7 @@ describe('CollectedUsage - GenerationJobManager', () => { isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const collectedUsage: UsageMetadata[] = [ { input_tokens: 100, output_tokens: 50, model: 'gpt-4' }, @@ -235,7 +235,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `text-extract-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -267,7 +267,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `empty-text-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -291,7 +291,7 @@ describe('AbortJob - Text and CollectedUsage', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `full-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -328,7 +328,7 @@ describe('AbortJob - Text and CollectedUsage', () => { isRedis: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const abortResult = await GenerationJobManager.abortJob('non-existent-job'); @@ -365,7 +365,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `parallel-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -419,7 +419,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `cache-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); @@ -459,7 +459,7 @@ describe('Real-world Scenarios', () => { cleanupOnComplete: false, }); - await GenerationJobManager.initialize(); + GenerationJobManager.initialize(); const streamId = `sequential-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts index 39b3d6029d..d0a815ce45 100644 --- a/packages/api/src/stream/implementations/InMemoryEventTransport.ts +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -32,7 +32,7 @@ export class InMemoryEventTransport implements IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void } { + ): { unsubscribe: () => void; ready?: Promise } { const state = this.getOrCreateStream(streamId); const chunkHandler = (event: unknown) => handlers.onChunk(event); diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 78f545c18e..2362afe647 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -92,8 +92,8 @@ export class RedisEventTransport implements IEventTransport { private subscriber: Redis | Cluster; /** Track subscribers per stream */ private streams = new Map(); - /** Track which channels we're subscribed to */ - private subscribedChannels = new Set(); + /** Track channel subscription state: resolved promise = active, pending = in-flight */ + private channelSubscriptions = new Map>(); /** Counter for generating unique subscriber IDs */ private subscriberIdCounter = 0; /** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */ @@ -122,9 +122,32 @@ export class RedisEventTransport implements IEventTransport { return current; } - /** Reset sequence counter for a stream */ - private resetSequence(streamId: string): void { + /** Reset publish sequence counter and subscriber reorder state for a stream (full cleanup only) */ + resetSequence(streamId: string): void { this.sequenceCounters.delete(streamId); + const state = this.streams.get(streamId); + if (state) { + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.nextSeq = 0; + state.reorderBuffer.pending.clear(); + } + } + + /** Advance subscriber reorder buffer to current publisher sequence without resetting publisher (cross-replica safe) */ + syncReorderBuffer(streamId: string): void { + const currentSeq = this.sequenceCounters.get(streamId) ?? 0; + const state = this.streams.get(streamId); + if (state) { + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.nextSeq = currentSeq; + state.reorderBuffer.pending.clear(); + } } /** @@ -331,7 +354,7 @@ export class RedisEventTransport implements IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void } { + ): { unsubscribe: () => void; ready?: Promise } { const channel = CHANNELS.events(streamId); const subscriberId = `sub_${++this.subscriberIdCounter}`; @@ -354,16 +377,23 @@ export class RedisEventTransport implements IEventTransport { streamState.count++; streamState.handlers.set(subscriberId, handlers); - // Subscribe to Redis channel if this is first subscriber - if (!this.subscribedChannels.has(channel)) { - this.subscribedChannels.add(channel); - this.subscriber.subscribe(channel).catch((err) => { - logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); - }); + let readyPromise = this.channelSubscriptions.get(channel); + + if (!readyPromise) { + readyPromise = this.subscriber + .subscribe(channel) + .then(() => { + logger.debug(`[RedisEventTransport] Subscription active for channel ${channel}`); + }) + .catch((err) => { + this.channelSubscriptions.delete(channel); + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); + }); + this.channelSubscriptions.set(channel, readyPromise); } - // Return unsubscribe function return { + ready: readyPromise, unsubscribe: () => { const state = this.streams.get(streamId); if (!state) { @@ -385,7 +415,7 @@ export class RedisEventTransport implements IEventTransport { this.subscriber.unsubscribe(channel).catch((err) => { logger.error(`[RedisEventTransport] Failed to unsubscribe from ${channel}:`, err); }); - this.subscribedChannels.delete(channel); + this.channelSubscriptions.delete(channel); // Call all-subscribers-left callbacks for (const callback of state.allSubscribersLeftCallbacks) { @@ -532,12 +562,15 @@ export class RedisEventTransport implements IEventTransport { state.abortCallbacks.push(callback); - // Subscribe to Redis channel if not already subscribed - if (!this.subscribedChannels.has(channel)) { - this.subscribedChannels.add(channel); - this.subscriber.subscribe(channel).catch((err) => { - logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); - }); + if (!this.channelSubscriptions.has(channel)) { + const ready = this.subscriber + .subscribe(channel) + .then(() => {}) + .catch((err) => { + this.channelSubscriptions.delete(channel); + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); + }); + this.channelSubscriptions.set(channel, ready); } } @@ -571,12 +604,11 @@ export class RedisEventTransport implements IEventTransport { // Reset sequence counter for this stream this.resetSequence(streamId); - // Unsubscribe from Redis channel - if (this.subscribedChannels.has(channel)) { + if (this.channelSubscriptions.has(channel)) { this.subscriber.unsubscribe(channel).catch((err) => { logger.error(`[RedisEventTransport] Failed to cleanup ${channel}:`, err); }); - this.subscribedChannels.delete(channel); + this.channelSubscriptions.delete(channel); } this.streams.delete(streamId); @@ -595,18 +627,20 @@ export class RedisEventTransport implements IEventTransport { state.reorderBuffer.pending.clear(); } - // Unsubscribe from all channels - for (const channel of this.subscribedChannels) { - this.subscriber.unsubscribe(channel).catch(() => { - // Ignore errors during shutdown - }); + for (const channel of this.channelSubscriptions.keys()) { + this.subscriber.unsubscribe(channel).catch(() => {}); } - this.subscribedChannels.clear(); + this.channelSubscriptions.clear(); this.streams.clear(); this.sequenceCounters.clear(); - // Note: Don't close Redis connections - they may be shared + try { + this.subscriber.disconnect(); + } catch { + /* ignore */ + } + logger.info('[RedisEventTransport] Destroyed'); } } diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index d990283925..5486b941eb 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -286,7 +286,7 @@ export interface IJobStore { * Implementations can use EventEmitter, Redis Pub/Sub, etc. */ export interface IEventTransport { - /** Subscribe to events for a stream */ + /** Subscribe to events for a stream. `ready` resolves once the transport can receive messages. */ subscribe( streamId: string, handlers: { @@ -294,7 +294,7 @@ export interface IEventTransport { onDone?: (event: unknown) => void; onError?: (error: string) => void; }, - ): { unsubscribe: () => void }; + ): { unsubscribe: () => void; ready?: Promise }; /** Publish a chunk event - returns Promise in Redis mode for ordered delivery */ emitChunk(streamId: string, event: unknown): void | Promise; @@ -329,6 +329,12 @@ export interface IEventTransport { /** Listen for all subscribers leaving */ onAllSubscribersLeft(streamId: string, callback: () => void): void; + /** Reset publish sequence counter for a stream (used during full stream cleanup) */ + resetSequence?(streamId: string): void; + + /** Advance subscriber reorder buffer to match publisher sequence (cross-replica safe: doesn't reset publisher counter) */ + syncReorderBuffer?(streamId: string): void; + /** Cleanup transport resources for a specific stream */ cleanup(streamId: string): void;