diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index f8d39cb4d8..a99fdca592 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -76,52 +76,62 @@ router.get('/chat/stream/:streamId', async (req, res) => { logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`); - // Send sync event with resume state for ALL reconnecting clients - // This supports multi-tab scenarios where each tab needs run step data - if (isResume) { - const resumeState = await GenerationJobManager.getResumeState(streamId); - if (resumeState && !res.writableEnded) { - // Send sync event with run steps AND aggregatedContent - // Client will use aggregatedContent to initialize message state - res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`); + const writeEvent = (event) => { + if (!res.writableEnded) { + res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); if (typeof res.flush === 'function') { res.flush(); } - logger.debug( - `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`, - ); } - } + }; - const result = await GenerationJobManager.subscribe( - streamId, - (event) => { - if (!res.writableEnded) { - res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); + const onDone = (event) => { + writeEvent(event); + res.end(); + }; + + const onError = (error) => { + if (!res.writableEnded) { + res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + res.end(); + } + }; + + let result; + + if (isResume) { + const { subscription, resumeState, pendingEvents } = + await GenerationJobManager.subscribeWithResume(streamId, writeEvent, onDone, onError); + + if (!res.writableEnded) { + if (resumeState) { + res.write( + `event: message\ndata: ${JSON.stringify({ sync: true, resumeState, pendingEvents })}\n\n`, + ); if (typeof res.flush === 'function') { res.flush(); } - } - }, - (event) => { - if (!res.writableEnded) { - res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); + GenerationJobManager.markSyncSent(streamId); + logger.debug( + `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, ${pendingEvents.length} pending events`, + ); + } else if (pendingEvents.length > 0) { + for (const event of pendingEvents) { + writeEvent(event); } - res.end(); + logger.warn( + `[AgentStream] Resume state null for ${streamId}, replayed ${pendingEvents.length} gap events directly`, + ); } - }, - (error) => { - if (!res.writableEnded) { - res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); - } - res.end(); - } - }, - ); + } + + result = subscription; + } else { + result = await GenerationJobManager.subscribe(streamId, writeEvent, onDone, onError); + } if (!result) { return res.status(404).json({ error: 'Failed to subscribe to stream' }); diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 831bf042ad..4d4cb4841a 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -226,12 +226,12 @@ export default function useResumableSSE( if (data.sync != null) { console.log('[ResumableSSE] SYNC received', { runSteps: data.resumeState?.runSteps?.length ?? 0, + pendingEvents: data.pendingEvents?.length ?? 0, }); const runId = v4(); setActiveRunId(runId); - // Replay run steps if (data.resumeState?.runSteps) { for (const runStep of data.resumeState.runSteps) { stepHandler({ event: 'on_run_step', data: runStep }, { @@ -241,19 +241,15 @@ export default function useResumableSSE( } } - // Set message content from aggregatedContent if (data.resumeState?.aggregatedContent && userMessage?.messageId) { const messages = getMessages() ?? []; const userMsgId = userMessage.messageId; const serverResponseId = data.resumeState.responseMessageId; - // Find the EXACT response message - prioritize responseMessageId from server - // This is critical when there are multiple responses to the same user message let responseIdx = -1; if (serverResponseId) { responseIdx = messages.findIndex((m) => m.messageId === serverResponseId); } - // Fallback: find by parentMessageId pattern (for new messages) if (responseIdx < 0) { responseIdx = messages.findIndex( (m) => @@ -272,7 +268,6 @@ export default function useResumableSSE( }); if (responseIdx >= 0) { - // Update existing response message with aggregatedContent const updated = [...messages]; const oldContent = updated[responseIdx]?.content; updated[responseIdx] = { @@ -285,25 +280,34 @@ export default function useResumableSSE( newContentLength: data.resumeState.aggregatedContent?.length, }); setMessages(updated); - // Sync both content handler and step handler with the updated message - // so subsequent deltas build on synced content, not stale content resetContentHandler(); syncStepMessage(updated[responseIdx]); console.log('[ResumableSSE] SYNC complete, handlers synced'); } else { - // Add new response message const responseId = serverResponseId ?? `${userMsgId}_`; - setMessages([ - ...messages, - { - messageId: responseId, - parentMessageId: userMsgId, - conversationId: currentSubmission.conversation?.conversationId ?? '', - text: '', - content: data.resumeState.aggregatedContent, - isCreatedByUser: false, - } as TMessage, - ]); + const newMessage = { + messageId: responseId, + parentMessageId: userMsgId, + conversationId: currentSubmission.conversation?.conversationId ?? '', + text: '', + content: data.resumeState.aggregatedContent, + isCreatedByUser: false, + } as TMessage; + setMessages([...messages, newMessage]); + resetContentHandler(); + syncStepMessage(newMessage); + } + } + + if (data.pendingEvents?.length > 0) { + console.log(`[ResumableSSE] Replaying ${data.pendingEvents.length} pending events`); + const submission = { ...currentSubmission, userMessage } as EventSubmission; + for (const pendingEvent of data.pendingEvents) { + if (pendingEvent.event != null) { + stepHandler(pendingEvent, submission); + } else if (pendingEvent.type != null) { + contentHandler({ data: pendingEvent, submission }); + } } } diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index cd5ff04eb0..1b612dcb8f 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -707,6 +707,10 @@ class GenerationJobManagerClass { * @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.) * @param onDone - Handler for completion event (includes final message) * @param onError - Handler for error events + * @param options - Subscription configuration + * @param options.skipBufferReplay - When true, skips replaying the earlyEventBuffer. + * Use this when a sync event was already sent (resume), since the sync's + * aggregatedContent already includes all buffered events. * @returns Subscription object with unsubscribe function, or null if job not found */ async subscribe( @@ -714,6 +718,7 @@ class GenerationJobManagerClass { onChunk: t.ChunkHandler, onDone?: t.DoneHandler, onError?: t.ErrorHandler, + options?: t.SubscribeOptions, ): Promise<{ unsubscribe: t.UnsubscribeFn } | null> { // Use lazy initialization to support cross-replica subscriptions const runtime = await this.getOrCreateRuntimeState(streamId); @@ -763,11 +768,17 @@ class GenerationJobManagerClass { runtime.hasSubscriber = true; if (runtime.earlyEventBuffer.length > 0) { - logger.debug( - `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, - ); - for (const bufferedEvent of runtime.earlyEventBuffer) { - onChunk(bufferedEvent); + if (options?.skipBufferReplay) { + logger.debug( + `[GenerationJobManager] Skipping ${runtime.earlyEventBuffer.length} buffered events for ${streamId} (skipBufferReplay)`, + ); + } else { + logger.debug( + `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, + ); + for (const bufferedEvent of runtime.earlyEventBuffer) { + onChunk(bufferedEvent); + } } runtime.earlyEventBuffer = []; } @@ -785,6 +796,52 @@ class GenerationJobManagerClass { return subscription; } + /** + * Atomic resume + subscribe: snapshots resume state and drains the early event buffer + * in one synchronous step, then subscribes with skipBufferReplay. + * + * Closes the timing gap between separate `getResumeState()` and `subscribe()` calls + * where events could arrive in earlyEventBuffer after the snapshot but before subscribe + * clears the buffer. + * + * In-memory mode: drained buffer events are returned as `pendingEvents` since + * they exist nowhere else. The caller must deliver them after the sync payload. + * Redis mode: `pendingEvents` is empty — chunks are persisted via appendChunk + * and will appear in aggregatedContent on the next resume. + */ + async subscribeWithResume( + streamId: string, + onChunk: t.ChunkHandler, + onDone?: t.DoneHandler, + onError?: t.ErrorHandler, + ): Promise { + const bufferLengthAtSnapshot = !this._isRedis + ? (this.runtimeState.get(streamId)?.earlyEventBuffer.length ?? 0) + : 0; + + const resumeState = await this.getResumeState(streamId); + + let pendingEvents: t.ServerSentEvent[] = []; + if (!this._isRedis) { + const runtime = this.runtimeState.get(streamId); + if (runtime) { + pendingEvents = runtime.earlyEventBuffer.slice(bufferLengthAtSnapshot); + runtime.earlyEventBuffer = []; + if (pendingEvents.length > 0) { + logger.debug( + `[GenerationJobManager] Captured ${pendingEvents.length} gap events for ${streamId}`, + ); + } + } + } + + const subscription = await this.subscribe(streamId, onChunk, onDone, onError, { + skipBufferReplay: true, + }); + + return { subscription, resumeState, pendingEvents }; + } + /** * Emit a chunk event to all subscribers. * Uses runtime state check for performance (avoids async job store lookup per token). diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 59fe32e4e5..2f23510018 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -1,3 +1,4 @@ +/* eslint jest/no-standalone-expect: ["error", { "additionalTestBlockFunctions": ["testRedis"] }] */ import type { Redis, Cluster } from 'ioredis'; import type { ServerSentEvent } from '~/types/events'; import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; @@ -27,6 +28,9 @@ describe('GenerationJobManager Integration Tests', () => { let dynamicKeyvClient: unknown = null; let dynamicKeyvReady: Promise | null = null; const testPrefix = 'JobManager-Integration-Test'; + const redisConfigured = process.env.USE_REDIS === 'true'; + const describeRedis = redisConfigured ? describe : describe.skip; + const testRedis = redisConfigured ? test : test.skip; beforeAll(async () => { originalEnv = { ...process.env }; @@ -82,6 +86,68 @@ describe('GenerationJobManager Integration Tests', () => { process.env = originalEnv; }); + function createInMemoryManager(): GenerationJobManagerClass { + const manager = new GenerationJobManagerClass(); + manager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + }); + manager.initialize(); + return manager; + } + + function createRedisManager(): GenerationJobManagerClass { + const manager = new GenerationJobManagerClass(); + manager.configure( + createStreamServices({ + useRedis: true, + redisClient: ioredisClient!, + }), + ); + manager.initialize(); + return manager; + } + + async function setupDisconnectedStream( + manager: GenerationJobManagerClass, + streamId: string, + delay: number, + ): Promise { + const firstEvents: ServerSentEvent[] = []; + const sub = await manager.subscribe(streamId, (event) => firstEvents.push(event)); + + await new Promise((resolve) => setTimeout(resolve, delay)); + + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: 'Hello' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + expect(firstEvents.length).toBe(2); + + sub?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, delay)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: ' world' } } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: '!' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + + return firstEvents; + } + describe('In-Memory Mode', () => { test('should create and manage jobs', async () => { // Configure with in-memory @@ -171,13 +237,8 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Redis Mode', () => { + describeRedis('Redis Mode', () => { test('should create and manage jobs via Redis', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // Create Redis services const services = createStreamServices({ useRedis: true, @@ -209,11 +270,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should persist chunks for cross-instance resume', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -264,11 +320,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should handle abort and return content', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -374,7 +425,7 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Cross-Replica Support (Redis)', () => { + describeRedis('Cross-Replica Support (Redis)', () => { /** * Problem: In k8s with Redis and multiple replicas, when a user sends a message: * 1. POST /api/agents/chat hits Replica A, creates job @@ -387,15 +438,10 @@ describe('GenerationJobManager Integration Tests', () => { * when the job exists in Redis but not in local memory. */ test('should NOT return 404 when stream endpoint hits different replica than job creator', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // === REPLICA A: Creates the job === // Simulate Replica A creating the job directly in Redis // (In real scenario, this happens via GenerationJobManager.createJob on Replica A) - const replicaAJobStore = new RedisJobStore(ioredisClient); + const replicaAJobStore = new RedisJobStore(ioredisClient!); await replicaAJobStore.initialize(); const streamId = `cross-replica-404-test-${Date.now()}`; @@ -452,13 +498,8 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should lazily create runtime state for jobs created on other replicas', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // Instance 1: Create the job directly in Redis (simulating another replica) - const jobStore = new RedisJobStore(ioredisClient); + const jobStore = new RedisJobStore(ioredisClient!); await jobStore.initialize(); const streamId = `cross-replica-${Date.now()}`; @@ -500,11 +541,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should persist syncSent to Redis for cross-replica consistency', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -539,11 +575,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should persist finalEvent to Redis for cross-replica access', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -581,11 +612,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should emit cross-replica abort signal via Redis pub/sub', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, @@ -620,16 +646,11 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should handle abort for lazily-initialized cross-replica jobs', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // This test validates that jobs created on Replica A and lazily-initialized // on Replica B can still receive and handle abort signals. // === Replica A: Create job directly in Redis === - const replicaAJobStore = new RedisJobStore(ioredisClient); + const replicaAJobStore = new RedisJobStore(ioredisClient!); await replicaAJobStore.initialize(); const streamId = `lazy-abort-${Date.now()}`; @@ -675,11 +696,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should abort generation when abort signal received from another replica', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // This test simulates: // 1. Replica A creates a job and starts generation // 2. Replica B receives abort request and emits abort signal @@ -729,13 +745,8 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should handle wasSyncSent for cross-replica scenarios', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - // Create job directly in Redis with syncSent: true - const jobStore = new RedisJobStore(ioredisClient); + const jobStore = new RedisJobStore(ioredisClient!); await jobStore.initialize(); const streamId = `cross-sync-${Date.now()}`; @@ -762,7 +773,7 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Sequential Event Ordering (Redis)', () => { + describeRedis('Sequential Event Ordering (Redis)', () => { /** * These tests verify that events are delivered in strict sequential order * when using Redis mode. This is critical because: @@ -773,11 +784,6 @@ describe('GenerationJobManager Integration Tests', () => { * The fix: emitChunk now awaits Redis publish to ensure ordered delivery. */ test('should maintain strict order for rapid sequential emits', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - jest.resetModules(); const services = createStreamServices({ @@ -823,11 +829,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should maintain order for tool call argument deltas', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - jest.resetModules(); const services = createStreamServices({ @@ -882,11 +883,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should maintain order: on_run_step before on_run_step_delta', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - jest.resetModules(); const services = createStreamServices({ @@ -945,11 +941,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should not block other streams when awaiting emitChunk', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - jest.resetModules(); const services = createStreamServices({ @@ -1069,12 +1060,7 @@ describe('GenerationJobManager Integration Tests', () => { await manager.destroy(); }); - test('should buffer and replay events emitted before subscribe (Redis)', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - + testRedis('should buffer and replay events emitted before subscribe (Redis)', async () => { const manager = new GenerationJobManagerClass(); const services = createStreamServices({ useRedis: true, @@ -1118,67 +1104,60 @@ describe('GenerationJobManager Integration Tests', () => { await manager.destroy(); }); - test('should not lose events when emitting before and after subscribe (Redis)', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - - const manager = new GenerationJobManagerClass(); - const services = createStreamServices({ - useRedis: true, - redisClient: ioredisClient, - }); - - manager.configure(services); - manager.initialize(); - - const streamId = `no-loss-${Date.now()}`; - await manager.createJob(streamId, 'user-1'); - - await manager.emitChunk(streamId, { - created: true, - message: { text: 'hello' }, - streamId, - } as unknown as ServerSentEvent); - await manager.emitChunk(streamId, { - event: 'on_run_step', - data: { id: 'step-1', type: 'message_creation', index: 0 }, - }); - - const receivedEvents: unknown[] = []; - const subscription = await manager.subscribe(streamId, (event: unknown) => - receivedEvents.push(event), - ); - - await new Promise((resolve) => setTimeout(resolve, 100)); - - for (let i = 0; i < 10; i++) { - await manager.emitChunk(streamId, { - event: 'on_message_delta', - data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + testRedis( + 'should not lose events when emitting before and after subscribe (Redis)', + async () => { + const manager = new GenerationJobManagerClass(); + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, }); - } - await new Promise((resolve) => setTimeout(resolve, 300)); + manager.configure(services); + manager.initialize(); - expect(receivedEvents.length).toBe(12); - expect((receivedEvents[0] as Record).created).toBe(true); - expect((receivedEvents[1] as Record).event).toBe('on_run_step'); - for (let i = 0; i < 10; i++) { - expect((receivedEvents[i + 2] as Record).event).toBe('on_message_delta'); - } + const streamId = `no-loss-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); - subscription?.unsubscribe(); - await manager.destroy(); - }); + await manager.emitChunk(streamId, { + created: true, + message: { text: 'hello' }, + streamId, + } as unknown as ServerSentEvent); + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', type: 'message_creation', index: 0 }, + }); - test('RedisEventTransport.subscribe() should return a ready promise', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } + const receivedEvents: unknown[] = []; + const subscription = await manager.subscribe(streamId, (event: unknown) => + receivedEvents.push(event), + ); + await new Promise((resolve) => setTimeout(resolve, 100)); + + for (let i = 0; i < 10; i++) { + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(receivedEvents.length).toBe(12); + expect((receivedEvents[0] as Record).created).toBe(true); + expect((receivedEvents[1] as Record).event).toBe('on_run_step'); + for (let i = 0; i < 10; i++) { + expect((receivedEvents[i + 2] as Record).event).toBe('on_message_delta'); + } + + subscription?.unsubscribe(); + await manager.destroy(); + }, + ); + + testRedis('RedisEventTransport.subscribe() should return a ready promise', async () => { const subscriber = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate(); const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); @@ -1211,6 +1190,421 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Resume: skipBufferReplay prevents duplication', () => { + /** + * Verifies the fix for duplicated content when navigating away from an + * in-progress conversation and back. Events accumulate in earlyEventBuffer + * while the subscriber is absent. On resume, the sync event delivers all + * accumulated content via aggregatedContent, so buffer replay must be + * skipped to prevent duplication. + */ + + test('should NOT replay buffer when skipBufferReplay is true (resume scenario)', async () => { + const manager = createInMemoryManager(); + const streamId = `skip-buf-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await setupDisconnectedStream(manager, streamId, 10); + + const resumeState = await manager.getResumeState(streamId); + expect(resumeState).not.toBeNull(); + + const resumeEvents: ServerSentEvent[] = []; + const sub2 = await manager.subscribe( + streamId, + (event) => resumeEvents.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(resumeEvents.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: ' Live!' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(resumeEvents.length).toBe(1); + expect(resumeEvents[0].event).toBe('on_message_delta'); + + sub2?.unsubscribe(); + await manager.destroy(); + }); + + test('should replay buffer by default when no options are passed', async () => { + const manager = createInMemoryManager(); + const streamId = `replay-buf-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1Events: ServerSentEvent[] = []; + const sub1 = await manager.subscribe(streamId, (event) => sub1Events.push(event)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } }, + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: 'buffered' } } }, + }); + + const sub2Events: ServerSentEvent[] = []; + const sub2 = await manager.subscribe(streamId, (event) => sub2Events.push(event)); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(sub2Events.length).toBe(1); + expect(sub2Events[0].event).toBe('on_message_delta'); + + sub2?.unsubscribe(); + await manager.destroy(); + }); + + test('should clear earlyEventBuffer even when skipping replay (no memory leak)', async () => { + const manager = createInMemoryManager(); + const streamId = `buf-clear-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 10)); + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buf1' } } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buf2' } } }, + }); + + const sub2Events: ServerSentEvent[] = []; + const sub2 = await manager.subscribe( + streamId, + (event) => sub2Events.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(sub2Events.length).toBe(0); + + sub2?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'new-event' } } }, + }); + + const sub3Events: ServerSentEvent[] = []; + const sub3 = await manager.subscribe(streamId, (event) => sub3Events.push(event)); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(sub3Events.length).toBe(1); + const event = sub3Events[0] as { + event: string; + data: { delta: { content: { text: string } } }; + }; + expect(event.data.delta.content.text).toBe('new-event'); + + sub3?.unsubscribe(); + await manager.destroy(); + }); + + test('should handle multiple disconnect/reconnect cycles with skipBufferReplay', async () => { + const manager = createInMemoryManager(); + const streamId = `multi-reconnect-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 10)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'initial' } } }, + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buffered-1' } } }, + }); + + const resumeState1 = await manager.getResumeState(streamId); + expect(resumeState1).not.toBeNull(); + + const sub2Events: ServerSentEvent[] = []; + const sub2 = await manager.subscribe( + streamId, + (event) => sub2Events.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(sub2Events.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'live-1' } } }, + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(sub2Events.length).toBe(1); + + sub2?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buffered-2' } } }, + }); + + const resumeState2 = await manager.getResumeState(streamId); + expect(resumeState2).not.toBeNull(); + + const sub3Events: ServerSentEvent[] = []; + const sub3 = await manager.subscribe( + streamId, + (event) => sub3Events.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(sub3Events.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'live-2' } } }, + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(sub3Events.length).toBe(1); + + sub3?.unsubscribe(); + await manager.destroy(); + }); + + testRedis('should NOT replay buffer when skipBufferReplay is true (Redis)', async () => { + const manager = createRedisManager(); + const streamId = `skip-buf-redis-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + await setupDisconnectedStream(manager, streamId, 100); + + const resumeState = await manager.getResumeState(streamId); + expect(resumeState).not.toBeNull(); + expect(resumeState!.aggregatedContent?.length).toBeGreaterThan(0); + + const resumeEvents: ServerSentEvent[] = []; + const sub2 = await manager.subscribe( + streamId, + (event) => resumeEvents.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + + await new Promise((resolve) => setTimeout(resolve, 200)); + expect(resumeEvents.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: ' Live!' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + expect(resumeEvents.length).toBe(1); + expect(resumeEvents[0].event).toBe('on_message_delta'); + + sub2?.unsubscribe(); + await manager.destroy(); + }); + + testRedis( + 'should replay buffer without skipBufferReplay after disconnect (Redis)', + async () => { + const manager = createRedisManager(); + const streamId = `replay-buf-redis-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 100)); + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buffered-redis' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const sub2Events: ServerSentEvent[] = []; + const sub2 = await manager.subscribe(streamId, (event) => sub2Events.push(event)); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(sub2Events.length).toBe(1); + expect(sub2Events[0].event).toBe('on_message_delta'); + + sub2?.unsubscribe(); + await manager.destroy(); + }, + ); + }); + + describe('Atomic subscribeWithResume', () => { + test('should return empty pendingEvents for pre-snapshot buffer events (in-memory)', async () => { + const manager = createInMemoryManager(); + const streamId = `atomic-drain-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 10)); + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } }, + }); + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { id: 'step-1', delta: { content: { type: 'text', text: 'buffered' } } }, + }); + + const liveEvents: ServerSentEvent[] = []; + const { subscription, resumeState, pendingEvents } = await manager.subscribeWithResume( + streamId, + (event) => liveEvents.push(event), + ); + + expect(resumeState).not.toBeNull(); + expect(pendingEvents.length).toBe(0); + expect(liveEvents.length).toBe(0); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + test('should return empty pendingEvents when buffer is empty', async () => { + const manager = createInMemoryManager(); + const streamId = `atomic-empty-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 10)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'delivered' } } }, + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + const { pendingEvents } = await manager.subscribeWithResume(streamId, () => {}); + + expect(pendingEvents.length).toBe(0); + + await manager.destroy(); + }); + + test('should deliver live events after subscribeWithResume', async () => { + const manager = createInMemoryManager(); + const streamId = `atomic-live-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 10)); + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buffered-pre-snapshot' } } }, + }); + + const liveEvents: ServerSentEvent[] = []; + const { subscription, pendingEvents } = await manager.subscribeWithResume(streamId, (event) => + liveEvents.push(event), + ); + + expect(pendingEvents.length).toBe(0); + expect(liveEvents.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'live-after' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(liveEvents.length).toBe(1); + const liveEvent = liveEvents[0] as { + event: string; + data: { delta: { content: { text: string } } }; + }; + expect(liveEvent.data.delta.content.text).toBe('live-after'); + + subscription?.unsubscribe(); + await manager.destroy(); + }); + + testRedis( + 'should return empty pendingEvents in Redis mode (chunks already persisted)', + async () => { + const manager = createRedisManager(); + const streamId = `atomic-redis-${Date.now()}`; + await manager.createJob(streamId, 'user-1'); + + const sub1 = await manager.subscribe(streamId, () => {}); + await new Promise((resolve) => setTimeout(resolve, 100)); + sub1?.unsubscribe(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'buffered-redis' } } }, + }); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const liveEvents: ServerSentEvent[] = []; + const { subscription, resumeState, pendingEvents } = await manager.subscribeWithResume( + streamId, + (event) => liveEvents.push(event), + ); + + expect(resumeState).not.toBeNull(); + expect(pendingEvents.length).toBe(0); + + await manager.emitChunk(streamId, { + event: 'on_message_delta', + data: { delta: { content: { type: 'text', text: 'live-redis' } } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + expect(liveEvents.length).toBe(1); + + subscription?.unsubscribe(); + await manager.destroy(); + }, + ); + }); + describe('Error Preservation for Late Subscribers', () => { /** * These tests verify the fix for the race condition where errors @@ -1369,14 +1763,9 @@ describe('GenerationJobManager Integration Tests', () => { await GenerationJobManager.destroy(); }); - test('should handle error preservation in Redis mode (cross-replica)', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - + testRedis('should handle error preservation in Redis mode (cross-replica)', async () => { // === Replica A: Creates job and emits error === - const replicaAJobStore = new RedisJobStore(ioredisClient); + const replicaAJobStore = new RedisJobStore(ioredisClient!); await replicaAJobStore.initialize(); const streamId = `redis-error-${Date.now()}`; @@ -1463,13 +1852,8 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Cross-Replica Live Streaming (Redis)', () => { + describeRedis('Cross-Replica Live Streaming (Redis)', () => { test('should publish events to Redis even when no local subscriber exists', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const replicaA = new GenerationJobManagerClass(); const servicesA = createStreamServices({ useRedis: true, @@ -1489,7 +1873,7 @@ describe('GenerationJobManager Integration Tests', () => { const streamId = `cross-live-${Date.now()}`; await replicaA.createJob(streamId, 'user-1'); - const replicaBJobStore = new RedisJobStore(ioredisClient); + const replicaBJobStore = new RedisJobStore(ioredisClient!); await replicaBJobStore.initialize(); await replicaBJobStore.createJob(streamId, 'user-1'); @@ -1519,11 +1903,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should not cause data loss on cross-replica subscribers when local subscriber joins', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const replicaA = new GenerationJobManagerClass(); const servicesA = createStreamServices({ useRedis: true, @@ -1543,7 +1922,7 @@ describe('GenerationJobManager Integration Tests', () => { const streamId = `cross-seq-safe-${Date.now()}`; await replicaA.createJob(streamId, 'user-1'); - const replicaBJobStore = new RedisJobStore(ioredisClient); + const replicaBJobStore = new RedisJobStore(ioredisClient!); await replicaBJobStore.initialize(); await replicaBJobStore.createJob(streamId, 'user-1'); @@ -1603,11 +1982,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should deliver buffered events locally AND publish live events cross-replica', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const replicaA = new GenerationJobManagerClass(); const servicesA = createStreamServices({ useRedis: true, @@ -1641,7 +2015,7 @@ describe('GenerationJobManager Integration Tests', () => { replicaB.configure(servicesB); replicaB.initialize(); - const replicaBJobStore = new RedisJobStore(ioredisClient); + const replicaBJobStore = new RedisJobStore(ioredisClient!); await replicaBJobStore.initialize(); await replicaBJobStore.createJob(streamId, 'user-1'); @@ -1671,13 +2045,8 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Concurrent Subscriber Readiness (Redis)', () => { + describeRedis('Concurrent Subscriber Readiness (Redis)', () => { test('should return ready promise to all concurrent subscribers for same stream', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const subscriber = ( ioredisClient as unknown as { duplicate: () => typeof ioredisClient } ).duplicate()!; @@ -1706,13 +2075,8 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Sequence Reset Safety (Redis)', () => { + describeRedis('Sequence Reset Safety (Redis)', () => { test('should not receive stale pre-subscribe events via Redis after sequence reset', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const manager = new GenerationJobManagerClass(); const services = createStreamServices({ useRedis: true, @@ -1774,11 +2138,6 @@ describe('GenerationJobManager Integration Tests', () => { }); test('should not reset sequence when second subscriber joins mid-stream', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const manager = new GenerationJobManagerClass(); const services = createStreamServices({ useRedis: true, @@ -1837,13 +2196,8 @@ describe('GenerationJobManager Integration Tests', () => { }); }); - describe('Subscribe Error Recovery (Redis)', () => { + describeRedis('Subscribe Error Recovery (Redis)', () => { test('should allow resubscription after Redis subscribe failure', async () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - const subscriber = ( ioredisClient as unknown as { duplicate: () => typeof ioredisClient } ).duplicate()!; @@ -1892,12 +2246,7 @@ describe('GenerationJobManager Integration Tests', () => { }); describe('createStreamServices Auto-Detection', () => { - test('should use Redis when useRedis is true and client is available', () => { - if (!ioredisClient) { - console.warn('Redis not available, skipping test'); - return; - } - + testRedis('should use Redis when useRedis is true and client is available', () => { const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, diff --git a/packages/api/src/types/stream.ts b/packages/api/src/types/stream.ts index 79b29d774f..068d9c8db8 100644 --- a/packages/api/src/types/stream.ts +++ b/packages/api/src/types/stream.ts @@ -47,3 +47,24 @@ export type ChunkHandler = (event: ServerSentEvent) => void; export type DoneHandler = (event: ServerSentEvent) => void; export type ErrorHandler = (error: string) => void; export type UnsubscribeFn = () => void; + +/** Options for subscribing to a job event stream */ +export interface SubscribeOptions { + /** + * When true, skips replaying the earlyEventBuffer. + * Use for resume connections after a sync event has been sent. + */ + skipBufferReplay?: boolean; +} + +/** Result of an atomic subscribe-with-resume operation */ +export interface SubscribeWithResumeResult { + subscription: { unsubscribe: UnsubscribeFn } | null; + resumeState: ResumeState | null; + /** + * Events that arrived between the resume snapshot and the subscribe call. + * In-memory mode: drained from earlyEventBuffer (only place they exist). + * Redis mode: empty — chunks are persisted to the store and appear in aggregatedContent on next resume. + */ + pendingEvents: ServerSentEvent[]; +}