From feb72ad2dce36ada20b0a27d23fedf1454fec7da Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 5 Feb 2026 17:57:33 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=84=20refactor:=20Sequential=20Event?= =?UTF-8?q?=20Ordering=20in=20Redis=20Streaming=20Mode=20(#11650)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: linting image context file * refactor: Event Emission with Async Handling for Redis Ordering - Updated emitEvent and related functions to be async, ensuring proper event ordering in Redis mode. - Refactored multiple handlers to await emitEvent calls, improving reliability for streaming deltas. - Enhanced GenerationJobManager to await chunk emissions, critical for maintaining sequential event delivery. - Added tests to verify that events are delivered in strict order when using Redis, addressing previous issues with out-of-order messages. * refactor: Clear Pending Buffers and Timeouts in RedisEventTransport - Enhanced the cleanup process in RedisEventTransport by ensuring that pending messages and flush timeouts are cleared when the last subscriber unsubscribes. - Updated the destroy method to also clear pending messages and flush timeouts for all streams, improving resource management and preventing memory leaks. * refactor: Update Event Emission to Async for Improved Ordering - Refactored GenerationJobManager and RedisEventTransport to make emitDone and emitError methods async, ensuring proper event ordering in Redis mode. - Updated all relevant calls to await these methods, enhancing reliability in event delivery. - Adjusted tests to verify that events are processed in the correct sequence, addressing previous issues with out-of-order messages. * refactor: Adjust RedisEventTransport for 0-Indexed Sequence Handling - Updated sequence handling in RedisEventTransport to be 0-indexed, ensuring consistency across event emissions and buffer management. - Modified integration tests to reflect the new sequence logic, improving the accuracy of event processing and delivery order. - Enhanced comments for clarity on sequence management and terminal event handling. * chore: Add Redis dump file to .gitignore - Included dump.rdb in .gitignore to prevent accidental commits of Redis database dumps, enhancing repository cleanliness and security. * test: Increase wait times in RedisEventTransport integration tests for CI stability - Adjusted wait times for subscription establishment and event propagation from 100ms and 200ms to 500ms to improve reliability in CI environments. - Enhanced code readability by formatting promise resolution lines for better clarity. --- .gitignore | 1 + api/server/controllers/agents/callbacks.js | 45 +- api/server/controllers/agents/request.js | 8 +- api/server/services/ActionService.js | 4 +- api/server/services/MCP.js | 13 +- api/server/services/ToolService.js | 4 +- .../api/src/stream/GenerationJobManager.ts | 20 +- ...ationJobManager.stream_integration.spec.ts | 289 +++++++++++- ...sEventTransport.stream_integration.spec.ts | 441 +++++++++++++++++- .../implementations/RedisEventTransport.ts | 310 ++++++++++-- .../api/src/stream/interfaces/IJobStore.ts | 12 +- .../api/src/tools/toolkits/imageContext.ts | 1 - 12 files changed, 1032 insertions(+), 116 deletions(-) diff --git a/.gitignore b/.gitignore index d173d26b60..d0c87ff03d 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ pids # CI/CD data test-image* +dump.rdb # Directory for instrumented libs generated by jscoverage/JSCover lib-cov diff --git a/api/server/controllers/agents/callbacks.js b/api/server/controllers/agents/callbacks.js index 5d706875ff..867e7f53af 100644 --- a/api/server/controllers/agents/callbacks.js +++ b/api/server/controllers/agents/callbacks.js @@ -152,13 +152,15 @@ function checkIfLastAgent(last_agent_id, langgraph_node) { /** * Helper to emit events either to res (standard mode) or to job emitter (resumable mode). + * In Redis mode, awaits the emit to guarantee event ordering (critical for streaming deltas). * @param {ServerResponse} res - The server response object * @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode * @param {Object} eventData - The event data to send + * @returns {Promise} */ -function emitEvent(res, streamId, eventData) { +async function emitEvent(res, streamId, eventData) { if (streamId) { - GenerationJobManager.emitChunk(streamId, eventData); + await GenerationJobManager.emitChunk(streamId, eventData); } else { sendEvent(res, eventData); } @@ -206,18 +208,18 @@ function getDefaultHandlers({ * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ - handle: (event, data, metadata) => { + handle: async (event, data, metadata) => { if (data?.stepDetails.type === StepTypes.TOOL_CALLS) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (!metadata?.hide_sequential_outputs) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else { const agentName = metadata?.name ?? 'Agent'; const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS; const action = isToolCall ? 'performing a task...' : 'thinking...'; - emitEvent(res, streamId, { + await emitEvent(res, streamId, { event: 'on_agent_update', data: { runId: metadata?.run_id, @@ -235,13 +237,13 @@ function getDefaultHandlers({ * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ - handle: (event, data, metadata) => { + handle: async (event, data, metadata) => { if (data?.delta.type === StepTypes.TOOL_CALLS) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (!metadata?.hide_sequential_outputs) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } aggregateContent({ event, data }); }, @@ -253,13 +255,13 @@ function getDefaultHandlers({ * @param {StreamEventData & { result: ToolEndData }} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ - handle: (event, data, metadata) => { + handle: async (event, data, metadata) => { if (data?.result != null) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (!metadata?.hide_sequential_outputs) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } aggregateContent({ event, data }); }, @@ -271,11 +273,11 @@ function getDefaultHandlers({ * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ - handle: (event, data, metadata) => { + handle: async (event, data, metadata) => { if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (!metadata?.hide_sequential_outputs) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } aggregateContent({ event, data }); }, @@ -287,11 +289,11 @@ function getDefaultHandlers({ * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ - handle: (event, data, metadata) => { + handle: async (event, data, metadata) => { if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } else if (!metadata?.hide_sequential_outputs) { - emitEvent(res, streamId, { event, data }); + await emitEvent(res, streamId, { event, data }); } aggregateContent({ event, data }); }, @@ -307,6 +309,7 @@ function getDefaultHandlers({ /** * Helper to write attachment events either to res or to job emitter. + * Note: Attachments are not order-sensitive like deltas, so fire-and-forget is acceptable. * @param {ServerResponse} res - The server response object * @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode * @param {Object} attachment - The attachment data diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index eb8fd5aec6..79387b6e89 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -324,7 +324,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit conversationId: conversation?.conversationId, }); - GenerationJobManager.emitDone(streamId, finalEvent); + await GenerationJobManager.emitDone(streamId, finalEvent); GenerationJobManager.completeJob(streamId); await decrementPendingRequest(userId); } else { @@ -344,7 +344,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit conversationId: conversation?.conversationId, }); - GenerationJobManager.emitDone(streamId, finalEvent); + await GenerationJobManager.emitDone(streamId, finalEvent); GenerationJobManager.completeJob(streamId, 'Request aborted'); await decrementPendingRequest(userId); } @@ -377,7 +377,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit // abortJob already handled emitDone and completeJob } else { logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error); - GenerationJobManager.emitError(streamId, error.message || 'Generation failed'); + await GenerationJobManager.emitError(streamId, error.message || 'Generation failed'); GenerationJobManager.completeJob(streamId, error.message); } @@ -406,7 +406,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit res.status(500).json({ error: error.message || 'Failed to start generation' }); } else { // JSON already sent, emit error to stream so client can receive it - GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation'); + await GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation'); } GenerationJobManager.completeJob(streamId, error.message); await decrementPendingRequest(userId); diff --git a/api/server/services/ActionService.js b/api/server/services/ActionService.js index a2a515d14a..132f6f4686 100644 --- a/api/server/services/ActionService.js +++ b/api/server/services/ActionService.js @@ -201,7 +201,7 @@ async function createActionTool({ async () => { const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; if (streamId) { - GenerationJobManager.emitChunk(streamId, eventData); + await GenerationJobManager.emitChunk(streamId, eventData); } else { sendEvent(res, eventData); } @@ -231,7 +231,7 @@ async function createActionTool({ data.delta.expires_at = undefined; const successEventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; if (streamId) { - GenerationJobManager.emitChunk(streamId, successEventData); + await GenerationJobManager.emitChunk(streamId, successEventData); } else { sendEvent(res, successEventData); } diff --git a/api/server/services/MCP.js b/api/server/services/MCP.js index e5133eaca5..8cb9932097 100644 --- a/api/server/services/MCP.js +++ b/api/server/services/MCP.js @@ -53,9 +53,9 @@ function isEmptyObjectSchema(jsonSchema) { function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) { /** * @param {string} authURL - The URL to redirect the user for OAuth authentication. - * @returns {void} + * @returns {Promise} */ - return function (authURL) { + return async function (authURL) { /** @type {{ id: string; delta: AgentToolCallDelta }} */ const data = { id: stepId, @@ -68,7 +68,7 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) { }; const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; if (streamId) { - GenerationJobManager.emitChunk(streamId, eventData); + await GenerationJobManager.emitChunk(streamId, eventData); } else { sendEvent(res, eventData); } @@ -83,9 +83,10 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) { * @param {ToolCallChunk} params.toolCall - The tool call object containing tool information. * @param {number} [params.index] * @param {string | null} [params.streamId] - The stream ID for resumable mode. + * @returns {() => Promise} */ function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) { - return function () { + return async function () { /** @type {import('@librechat/agents').RunStep} */ const data = { runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID, @@ -99,7 +100,7 @@ function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = }; const eventData = { event: GraphEvents.ON_RUN_STEP, data }; if (streamId) { - GenerationJobManager.emitChunk(streamId, eventData); + await GenerationJobManager.emitChunk(streamId, eventData); } else { sendEvent(res, eventData); } @@ -147,7 +148,7 @@ function createOAuthEnd({ res, stepId, toolCall, streamId = null }) { }; const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; if (streamId) { - GenerationJobManager.emitChunk(streamId, eventData); + await GenerationJobManager.emitChunk(streamId, eventData); } else { sendEvent(res, eventData); } diff --git a/api/server/services/ToolService.js b/api/server/services/ToolService.js index 21fba1dc39..fe7a0f40c2 100644 --- a/api/server/services/ToolService.js +++ b/api/server/services/ToolService.js @@ -526,8 +526,8 @@ async function loadToolDefinitionsWrapper({ req, res, agent, streamId = null, to const runStepDeltaEvent = { event: GraphEvents.ON_RUN_STEP_DELTA, data: runStepDeltaData }; if (streamId) { - GenerationJobManager.emitChunk(streamId, runStepEvent); - GenerationJobManager.emitChunk(streamId, runStepDeltaEvent); + await GenerationJobManager.emitChunk(streamId, runStepEvent); + await GenerationJobManager.emitChunk(streamId, runStepDeltaEvent); } else if (res && !res.writableEnded) { sendEvent(res, runStepEvent); sendEvent(res, runStepDeltaEvent); diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index d4b9b97eda..fefb0dd207 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -662,7 +662,7 @@ class GenerationJobManagerClass { runtime.finalEvent = abortFinalEvent; } - this.eventTransport.emitDone(streamId, abortFinalEvent); + await this.eventTransport.emitDone(streamId, abortFinalEvent); this.jobStore.clearContentState(streamId); this.runStepBuffers?.delete(streamId); @@ -789,8 +789,11 @@ class GenerationJobManagerClass { * * If no subscriber has connected yet, buffers the event for replay when they do. * This ensures early events (like 'created') aren't lost due to race conditions. + * + * In Redis mode, awaits the publish to guarantee event ordering. + * This is critical for streaming deltas (tool args, message content) to arrive in order. */ - emitChunk(streamId: string, event: t.ServerSentEvent): void { + async emitChunk(streamId: string, event: t.ServerSentEvent): Promise { const runtime = this.runtimeState.get(streamId); if (!runtime || runtime.abortController.signal.aborted) { return; @@ -799,7 +802,7 @@ class GenerationJobManagerClass { // Track user message from created event this.trackUserMessage(streamId, event); - // For Redis mode, persist chunk for later reconstruction + // For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability) if (this._isRedis) { // The SSE event structure is { event: string, data: unknown, ... } // The aggregator expects { event: string, data: unknown } where data is the payload @@ -825,7 +828,8 @@ class GenerationJobManagerClass { runtime.earlyEventBuffer.push(event); } - this.eventTransport.emitChunk(streamId, event); + // Await the transport emit - critical for Redis mode to maintain event order + await this.eventTransport.emitChunk(streamId, event); } /** @@ -1035,7 +1039,7 @@ class GenerationJobManagerClass { * Emit a done event. * Persists finalEvent to Redis for cross-replica access. */ - emitDone(streamId: string, event: t.ServerSentEvent): void { + async emitDone(streamId: string, event: t.ServerSentEvent): Promise { const runtime = this.runtimeState.get(streamId); if (runtime) { runtime.finalEvent = event; @@ -1044,7 +1048,7 @@ class GenerationJobManagerClass { this.jobStore.updateJob(streamId, { finalEvent: JSON.stringify(event) }).catch((err) => { logger.error(`[GenerationJobManager] Failed to persist finalEvent:`, err); }); - this.eventTransport.emitDone(streamId, event); + await this.eventTransport.emitDone(streamId, event); } /** @@ -1052,7 +1056,7 @@ class GenerationJobManagerClass { * Stores the error for late-connecting subscribers (race condition where error * occurs before client connects to SSE stream). */ - emitError(streamId: string, error: string): void { + async emitError(streamId: string, error: string): Promise { const runtime = this.runtimeState.get(streamId); if (runtime) { runtime.errorEvent = error; @@ -1061,7 +1065,7 @@ class GenerationJobManagerClass { this.jobStore.updateJob(streamId, { error }).catch((err) => { logger.error(`[GenerationJobManager] Failed to persist error:`, err); }); - this.eventTransport.emitError(streamId, error); + await this.eventTransport.emitError(streamId, error); } /** diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index e3ea16c8f0..8723f3f000 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -134,12 +134,12 @@ describe('GenerationJobManager Integration Tests', () => { // Wait for first subscriber to be registered await new Promise((resolve) => setTimeout(resolve, 10)); - // Emit chunks (emitChunk takes { event, data } format) - GenerationJobManager.emitChunk(streamId, { + // Emit chunks (emitChunk takes { event, data } format, now async for Redis ordering) + await GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { type: 'text', text: 'Hello' }, }); - GenerationJobManager.emitChunk(streamId, { + await GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { type: 'text', text: ' world' }, }); @@ -219,8 +219,8 @@ describe('GenerationJobManager Integration Tests', () => { await GenerationJobManager.createJob(streamId, 'user-1'); // Emit chunks (these should be persisted to Redis) - // emitChunk takes { event, data } format - GenerationJobManager.emitChunk(streamId, { + // emitChunk takes { event, data } format, now async for Redis ordering + await GenerationJobManager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', @@ -229,14 +229,14 @@ describe('GenerationJobManager Integration Tests', () => { stepDetails: { type: 'message_creation' }, }, }); - GenerationJobManager.emitChunk(streamId, { + await GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', delta: { content: { type: 'text', text: 'Persisted ' } }, }, }); - GenerationJobManager.emitChunk(streamId, { + await GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', @@ -276,8 +276,8 @@ describe('GenerationJobManager Integration Tests', () => { const streamId = `redis-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); - // Emit some content (emitChunk takes { event, data } format) - GenerationJobManager.emitChunk(streamId, { + // Emit some content (emitChunk takes { event, data } format, now async) + await GenerationJobManager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', @@ -286,7 +286,7 @@ describe('GenerationJobManager Integration Tests', () => { stepDetails: { type: 'message_creation' }, }, }); - GenerationJobManager.emitChunk(streamId, { + await GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', @@ -582,7 +582,7 @@ describe('GenerationJobManager Integration Tests', () => { conversation: { conversationId: streamId }, responseMessage: { text: 'Hello world' }, }; - GenerationJobManager.emitDone(streamId, finalEventData as never); + await GenerationJobManager.emitDone(streamId, finalEventData as never); await new Promise((resolve) => setTimeout(resolve, 200)); @@ -796,6 +796,267 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Sequential Event Ordering (Redis)', () => { + /** + * These tests verify that events are delivered in strict sequential order + * when using Redis mode. This is critical because: + * 1. LLM streaming tokens must arrive in order for coherent output + * 2. Tool call argument deltas must be concatenated in order + * 3. Run step events must precede their deltas + * + * The fix: emitChunk now awaits Redis publish to ensure ordered delivery. + */ + test('should maintain strict order for rapid sequential emits', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `order-rapid-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const receivedIndices: number[] = []; + + const subscription = await GenerationJobManager.subscribe(streamId, (event) => { + const data = event as { event: string; data: { index: number } }; + if (data.event === 'test') { + receivedIndices.push(data.data.index); + } + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Emit 30 events rapidly - with await, they must arrive in order + for (let i = 0; i < 30; i++) { + await GenerationJobManager.emitChunk(streamId, { + event: 'test', + data: { index: i }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify all events arrived in correct order + expect(receivedIndices.length).toBe(30); + for (let i = 0; i < 30; i++) { + expect(receivedIndices[i]).toBe(i); + } + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + + test('should maintain order for tool call argument deltas', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `tool-args-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const receivedArgs: string[] = []; + + const subscription = await GenerationJobManager.subscribe(streamId, (event) => { + const data = event as { + event: string; + data: { delta: { tool_calls: { args: string }[] } }; + }; + if (data.event === 'on_run_step_delta') { + receivedArgs.push(data.data.delta.tool_calls[0].args); + } + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Simulate streaming JSON args: {"code": "print('hello')"} + const argChunks = ['{"', 'code', '": "', 'print', "('", 'hello', "')", '"}']; + + for (const chunk of argChunks) { + await GenerationJobManager.emitChunk(streamId, { + event: 'on_run_step_delta', + data: { + id: 'step-1', + delta: { + type: 'tool_calls', + tool_calls: [{ index: 0, args: chunk }], + }, + }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // This was the original bug - args would arrive scrambled without await + expect(receivedArgs).toEqual(argChunks); + expect(receivedArgs.join('')).toBe(`{"code": "print('hello')"}`); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + + test('should maintain order: on_run_step before on_run_step_delta', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `step-order-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const receivedEvents: string[] = []; + + const subscription = await GenerationJobManager.subscribe(streamId, (event) => { + const data = event as { event: string }; + receivedEvents.push(data.event); + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Emit in correct order: step first, then deltas + await GenerationJobManager.emitChunk(streamId, { + event: 'on_run_step', + data: { id: 'step-1', type: 'tool_calls', index: 0 }, + }); + + await GenerationJobManager.emitChunk(streamId, { + event: 'on_run_step_delta', + data: { id: 'step-1', delta: { type: 'tool_calls', tool_calls: [{ args: '{' }] } }, + }); + + await GenerationJobManager.emitChunk(streamId, { + event: 'on_run_step_delta', + data: { id: 'step-1', delta: { type: 'tool_calls', tool_calls: [{ args: '}' }] } }, + }); + + await GenerationJobManager.emitChunk(streamId, { + event: 'on_run_step_completed', + data: { id: 'step-1', result: { content: '{}' } }, + }); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify ordering: step -> deltas -> completed + expect(receivedEvents).toEqual([ + 'on_run_step', + 'on_run_step_delta', + 'on_run_step_delta', + 'on_run_step_completed', + ]); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + + test('should not block other streams when awaiting emitChunk', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId1 = `concurrent-1-${Date.now()}`; + const streamId2 = `concurrent-2-${Date.now()}`; + + await GenerationJobManager.createJob(streamId1, 'user-1'); + await GenerationJobManager.createJob(streamId2, 'user-2'); + + const stream1Events: number[] = []; + const stream2Events: number[] = []; + + const sub1 = await GenerationJobManager.subscribe(streamId1, (event) => { + const data = event as { event: string; data: { index: number } }; + if (data.event === 'test') { + stream1Events.push(data.data.index); + } + }); + + const sub2 = await GenerationJobManager.subscribe(streamId2, (event) => { + const data = event as { event: string; data: { index: number } }; + if (data.event === 'test') { + stream2Events.push(data.data.index); + } + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Emit to both streams concurrently (simulating two LLM responses) + const emitPromises: Promise[] = []; + for (let i = 0; i < 10; i++) { + emitPromises.push( + GenerationJobManager.emitChunk(streamId1, { event: 'test', data: { index: i } }), + ); + emitPromises.push( + GenerationJobManager.emitChunk(streamId2, { event: 'test', data: { index: i * 100 } }), + ); + } + await Promise.all(emitPromises); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Each stream should have all events, in order within their stream + expect(stream1Events.length).toBe(10); + expect(stream2Events.length).toBe(10); + + // Verify each stream's internal order + for (let i = 0; i < 10; i++) { + expect(stream1Events[i]).toBe(i); + expect(stream2Events[i]).toBe(i * 100); + } + + sub1?.unsubscribe(); + sub2?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + }); + describe('Error Preservation for Late Subscribers', () => { /** * These tests verify the fix for the race condition where errors @@ -825,7 +1086,7 @@ describe('GenerationJobManager Integration Tests', () => { const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }'; // Emit error (no subscribers yet - simulates race condition) - GenerationJobManager.emitError(streamId, errorMessage); + await GenerationJobManager.emitError(streamId, errorMessage); // Wait for async job store update await new Promise((resolve) => setTimeout(resolve, 50)); @@ -891,7 +1152,7 @@ describe('GenerationJobManager Integration Tests', () => { const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }'; // Simulate race condition: error occurs before client connects - GenerationJobManager.emitError(streamId, errorMessage); + await GenerationJobManager.emitError(streamId, errorMessage); await GenerationJobManager.completeJob(streamId, errorMessage); // Wait for async operations @@ -940,7 +1201,7 @@ describe('GenerationJobManager Integration Tests', () => { const errorMessage = 'Error should take priority'; // Emit error and complete with error - GenerationJobManager.emitError(streamId, errorMessage); + await GenerationJobManager.emitError(streamId, errorMessage); await GenerationJobManager.completeJob(streamId, errorMessage); await new Promise((resolve) => setTimeout(resolve, 50)); diff --git a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts index 31266b3e11..d1f9467cd0 100644 --- a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts @@ -70,16 +70,16 @@ describe('RedisEventTransport Integration Tests', () => { }, }); - // Wait for subscription to be established - await new Promise((resolve) => setTimeout(resolve, 100)); + // Wait for subscription to be established (increased for CI) + await new Promise((resolve) => setTimeout(resolve, 500)); - // Emit events - transport.emitChunk(streamId, { type: 'text', text: 'Hello' }); - transport.emitChunk(streamId, { type: 'text', text: ' World' }); - transport.emitDone(streamId, { finished: true }); + // Emit events (emitChunk/emitDone are async for ordered delivery) + await transport.emitChunk(streamId, { type: 'text', text: 'Hello' }); + await transport.emitChunk(streamId, { type: 'text', text: ' World' }); + await transport.emitDone(streamId, { finished: true }); - // Wait for events to propagate - await new Promise((resolve) => setTimeout(resolve, 200)); + // Wait for events to propagate (increased for CI) + await new Promise((resolve) => setTimeout(resolve, 500)); expect(receivedChunks.length).toBe(2); expect(doneEvent).toEqual({ finished: true }); @@ -117,7 +117,7 @@ describe('RedisEventTransport Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 100)); // Emit from transport 1 (producer on different instance) - transport1.emitChunk(streamId, { data: 'from-instance-1' }); + await transport1.emitChunk(streamId, { data: 'from-instance-1' }); // Wait for cross-instance delivery await new Promise((resolve) => setTimeout(resolve, 200)); @@ -160,7 +160,7 @@ describe('RedisEventTransport Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 100)); - transport.emitChunk(streamId, { data: 'broadcast' }); + await transport.emitChunk(streamId, { data: 'broadcast' }); await new Promise((resolve) => setTimeout(resolve, 200)); @@ -175,6 +175,425 @@ describe('RedisEventTransport Integration Tests', () => { }); }); + describe('Sequential Event Ordering', () => { + test('should maintain strict order when emitChunk is awaited', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `order-test-${Date.now()}`; + const receivedEvents: number[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => receivedEvents.push((event as { index: number }).index), + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Emit 20 events rapidly with await - they should arrive in order + for (let i = 0; i < 20; i++) { + await transport.emitChunk(streamId, { index: i }); + } + + // Wait for all events to propagate + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify all events arrived in correct order + expect(receivedEvents.length).toBe(20); + for (let i = 0; i < 20; i++) { + expect(receivedEvents[i]).toBe(i); + } + + transport.destroy(); + subscriber.disconnect(); + }); + + test('should maintain order for tool call delta chunks (simulates streaming args)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `tool-delta-order-${Date.now()}`; + const receivedArgs: string[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => { + const data = event as { + event: string; + data: { delta: { tool_calls: { args: string }[] } }; + }; + if (data.event === 'on_run_step_delta') { + receivedArgs.push(data.data.delta.tool_calls[0].args); + } + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Simulate streaming tool call arguments like: {"code": "# First line\n..." + const argChunks = ['{"code"', ': "', '# First', ' line', '\\n', '..."', '}']; + + for (const chunk of argChunks) { + await transport.emitChunk(streamId, { + event: 'on_run_step_delta', + data: { + id: 'step-1', + delta: { + type: 'tool_calls', + tool_calls: [{ index: 0, args: chunk }], + }, + }, + }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify chunks arrived in correct order - this was the bug we fixed + expect(receivedArgs).toEqual(argChunks); + expect(receivedArgs.join('')).toBe('{"code": "# First line\\n..."}'); + + transport.destroy(); + subscriber.disconnect(); + }); + + test('should maintain order across multiple concurrent streams (no cross-contamination)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId1 = `concurrent-stream-1-${Date.now()}`; + const streamId2 = `concurrent-stream-2-${Date.now()}`; + + const stream1Events: number[] = []; + const stream2Events: number[] = []; + + transport.subscribe(streamId1, { + onChunk: (event) => stream1Events.push((event as { index: number }).index), + }); + transport.subscribe(streamId2, { + onChunk: (event) => stream2Events.push((event as { index: number }).index), + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Interleave events from both streams + for (let i = 0; i < 10; i++) { + await transport.emitChunk(streamId1, { index: i }); + await transport.emitChunk(streamId2, { index: i * 10 }); + } + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Each stream should have its own ordered events + expect(stream1Events).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + expect(stream2Events).toEqual([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]); + + transport.destroy(); + subscriber.disconnect(); + }); + }); + + describe('Reorder Buffer (Redis Cluster Fix)', () => { + test('should reorder out-of-sequence messages', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'reorder-test'; + const receivedEvents: number[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => receivedEvents.push((event as { index: number }).index), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + + const channel = `stream:{${streamId}}:events`; + + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { index: 0 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } })); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(receivedEvents).toEqual([0, 1, 2]); + + transport.destroy(); + }); + + test('should buffer early messages and deliver when gaps are filled', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'buffer-test'; + const receivedEvents: number[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => receivedEvents.push((event as { index: number }).index), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + + const channel = `stream:{${streamId}}:events`; + + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 4, data: { index: 4 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 3, data: { index: 3 } })); + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(receivedEvents).toEqual([]); + + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { index: 0 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } })); + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(receivedEvents).toEqual([0, 1, 2, 3, 4]); + + transport.destroy(); + }); + + test('should force-flush on timeout when gaps are not filled', async () => { + jest.useFakeTimers(); + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'timeout-test'; + const receivedEvents: number[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => receivedEvents.push((event as { index: number }).index), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + + const channel = `stream:{${streamId}}:events`; + + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 3, data: { index: 3 } })); + + expect(receivedEvents).toEqual([]); + + jest.advanceTimersByTime(600); + + expect(receivedEvents).toEqual([2, 3]); + + transport.destroy(); + jest.useRealTimers(); + }); + + test('should handle messages without sequence numbers (backward compatibility)', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + on: jest.fn(), + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + }; + + const transport = new RedisEventTransport( + mockPublisher as unknown as Redis, + mockSubscriber as unknown as Redis, + ); + + const streamId = 'compat-test'; + const receivedEvents: string[] = []; + + transport.subscribe(streamId, { + onChunk: (event) => receivedEvents.push((event as { msg: string }).msg), + onDone: (event) => receivedEvents.push(`done:${(event as { msg: string }).msg}`), + }); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1] as (channel: string, message: string) => void; + + const channel = `stream:{${streamId}}:events`; + + messageHandler(channel, JSON.stringify({ type: 'chunk', data: { msg: 'no-seq-1' } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', data: { msg: 'no-seq-2' } })); + messageHandler(channel, JSON.stringify({ type: 'done', data: { msg: 'finished' } })); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(receivedEvents).toEqual(['no-seq-1', 'no-seq-2', 'done:finished']); + + transport.destroy(); + }); + + test('should deliver done event after all pending chunks (terminal event ordering)', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + on: jest.fn(), + }; + + const transport = new RedisEventTransport(mockPublisher as never, mockSubscriber as never); + const streamId = `terminal-order-${Date.now()}`; + + const receivedEvents: string[] = []; + let doneReceived = false; + + transport.subscribe(streamId, { + onChunk: (event: unknown) => { + const e = event as { msg?: string }; + receivedEvents.push(e.msg ?? 'unknown'); + }, + onDone: (event: unknown) => { + const e = event as { msg?: string }; + receivedEvents.push(`done:${e.msg ?? 'finished'}`); + doneReceived = true; + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1]; + expect(messageHandler).toBeDefined(); + + const channel = `stream:{${streamId}}:events`; + + // Simulate out-of-order delivery in Redis Cluster: + // Done event (seq=3) arrives before chunk seq=2 + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { msg: 'chunk-0' } })); + messageHandler(channel, JSON.stringify({ type: 'done', seq: 3, data: { msg: 'complete' } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { msg: 'chunk-2' } })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { msg: 'chunk-1' } })); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Done event should be delivered AFTER all chunks despite arriving early + expect(doneReceived).toBe(true); + expect(receivedEvents).toEqual(['chunk-0', 'chunk-1', 'chunk-2', 'done:complete']); + + transport.destroy(); + }); + + test('should deliver error event after all pending chunks (terminal event ordering)', async () => { + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const mockPublisher = { + publish: jest.fn().mockResolvedValue(1), + }; + const mockSubscriber = { + subscribe: jest.fn().mockResolvedValue(undefined), + unsubscribe: jest.fn().mockResolvedValue(undefined), + on: jest.fn(), + }; + + const transport = new RedisEventTransport(mockPublisher as never, mockSubscriber as never); + const streamId = `terminal-error-${Date.now()}`; + + const receivedEvents: string[] = []; + let errorReceived: string | undefined; + + transport.subscribe(streamId, { + onChunk: (event: unknown) => { + const e = event as { msg?: string }; + receivedEvents.push(e.msg ?? 'unknown'); + }, + onError: (error: string) => { + receivedEvents.push(`error:${error}`); + errorReceived = error; + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + const messageHandler = mockSubscriber.on.mock.calls.find( + (call) => call[0] === 'message', + )?.[1]; + expect(messageHandler).toBeDefined(); + + const channel = `stream:{${streamId}}:events`; + + // Simulate out-of-order delivery: error arrives before final chunks + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { msg: 'chunk-0' } })); + messageHandler(channel, JSON.stringify({ type: 'error', seq: 2, error: 'Something failed' })); + messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { msg: 'chunk-1' } })); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Error event should be delivered AFTER all preceding chunks + expect(errorReceived).toBe('Something failed'); + expect(receivedEvents).toEqual(['chunk-0', 'chunk-1', 'error:Something failed']); + + transport.destroy(); + }); + }); + describe('Subscriber Management', () => { test('should track first subscriber correctly', async () => { if (!ioredisClient) { @@ -283,7 +702,7 @@ describe('RedisEventTransport Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 100)); - transport.emitError(streamId, 'Test error message'); + await transport.emitError(streamId, 'Test error message'); await new Promise((resolve) => setTimeout(resolve, 200)); diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 02d4cb69ed..78f545c18e 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -22,10 +22,30 @@ const EventTypes = { interface PubSubMessage { type: (typeof EventTypes)[keyof typeof EventTypes]; + /** Sequence number for ordering (critical for Redis Cluster) */ + seq?: number; data?: unknown; error?: string; } +/** + * Reorder buffer state for a stream subscription. + * Handles out-of-order message delivery in Redis Cluster mode. + */ +interface ReorderBuffer { + /** Next expected sequence number */ + nextSeq: number; + /** Buffered messages waiting for earlier sequences */ + pending: Map; + /** Timeout handle for flushing stale messages */ + flushTimeout: ReturnType | null; +} + +/** Max time (ms) to wait for out-of-order messages before force-flushing */ +const REORDER_TIMEOUT_MS = 500; +/** Max messages to buffer before force-flushing (prevents memory issues) */ +const MAX_BUFFER_SIZE = 100; + /** * Subscriber state for a stream */ @@ -42,6 +62,8 @@ interface StreamSubscribers { allSubscribersLeftCallbacks: Array<() => void>; /** Abort callbacks - called when abort signal is received from any replica */ abortCallbacks: Array<() => void>; + /** Reorder buffer for handling out-of-order delivery in Redis Cluster */ + reorderBuffer: ReorderBuffer; } /** @@ -74,6 +96,8 @@ export class RedisEventTransport implements IEventTransport { private subscribedChannels = new Set(); /** Counter for generating unique subscriber IDs */ private subscriberIdCounter = 0; + /** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */ + private sequenceCounters = new Map(); /** * Create a new Redis event transport. @@ -91,12 +115,22 @@ export class RedisEventTransport implements IEventTransport { }); } + /** Get next sequence number for a stream (0-indexed) */ + private getNextSequence(streamId: string): number { + const current = this.sequenceCounters.get(streamId) ?? 0; + this.sequenceCounters.set(streamId, current + 1); + return current; + } + + /** Reset sequence counter for a stream */ + private resetSequence(streamId: string): void { + this.sequenceCounters.delete(streamId); + } + /** - * Handle incoming pub/sub message + * Handle incoming pub/sub message with reordering support for Redis Cluster */ private handleMessage(channel: string, message: string): void { - // Extract streamId from channel name: stream:{streamId}:events - // Use regex to extract the hash tag content const match = channel.match(/^stream:\{([^}]+)\}:events$/); if (!match) { return; @@ -111,38 +145,179 @@ export class RedisEventTransport implements IEventTransport { try { const parsed = JSON.parse(message) as PubSubMessage; - for (const [, handlers] of streamState.handlers) { - switch (parsed.type) { - case EventTypes.CHUNK: - handlers.onChunk(parsed.data); - break; - case EventTypes.DONE: - handlers.onDone?.(parsed.data); - break; - case EventTypes.ERROR: - handlers.onError?.(parsed.error ?? 'Unknown error'); - break; - case EventTypes.ABORT: - // Abort is handled at stream level, not per-handler - break; - } - } - - // Handle abort signals at stream level (not per-handler) - if (parsed.type === EventTypes.ABORT) { - for (const callback of streamState.abortCallbacks) { - try { - callback(); - } catch (err) { - logger.error(`[RedisEventTransport] Error in abort callback:`, err); - } - } + if (parsed.type === EventTypes.CHUNK && parsed.seq != null) { + this.handleOrderedChunk(streamId, streamState, parsed); + } else if ( + (parsed.type === EventTypes.DONE || parsed.type === EventTypes.ERROR) && + parsed.seq != null + ) { + this.handleTerminalEvent(streamId, streamState, parsed); + } else { + this.deliverMessage(streamState, parsed); } } catch (err) { logger.error(`[RedisEventTransport] Failed to parse message:`, err); } } + /** + * Handle terminal events (done/error) with sequence-based ordering. + * Buffers the terminal event and delivers after all preceding chunks arrive. + */ + private handleTerminalEvent( + streamId: string, + streamState: StreamSubscribers, + message: PubSubMessage, + ): void { + const buffer = streamState.reorderBuffer; + const seq = message.seq!; + + if (seq < buffer.nextSeq) { + logger.debug( + `[RedisEventTransport] Dropping duplicate terminal event for stream ${streamId}: seq=${seq}, expected=${buffer.nextSeq}`, + ); + return; + } + + if (seq === buffer.nextSeq) { + this.deliverMessage(streamState, message); + buffer.nextSeq++; + this.flushPendingMessages(streamId, streamState); + } else { + buffer.pending.set(seq, message); + this.scheduleFlushTimeout(streamId, streamState); + } + } + + /** + * Handle chunk messages with sequence-based reordering. + * Buffers out-of-order messages and delivers them in sequence. + */ + private handleOrderedChunk( + streamId: string, + streamState: StreamSubscribers, + message: PubSubMessage, + ): void { + const buffer = streamState.reorderBuffer; + const seq = message.seq!; + + if (seq === buffer.nextSeq) { + this.deliverMessage(streamState, message); + buffer.nextSeq++; + + this.flushPendingMessages(streamId, streamState); + } else if (seq > buffer.nextSeq) { + buffer.pending.set(seq, message); + + if (buffer.pending.size >= MAX_BUFFER_SIZE) { + logger.warn(`[RedisEventTransport] Buffer overflow for stream ${streamId}, force-flushing`); + this.forceFlushBuffer(streamId, streamState); + } else { + this.scheduleFlushTimeout(streamId, streamState); + } + } else { + logger.debug( + `[RedisEventTransport] Dropping duplicate/old message for stream ${streamId}: seq=${seq}, expected=${buffer.nextSeq}`, + ); + } + } + + /** Deliver consecutive pending messages */ + private flushPendingMessages(streamId: string, streamState: StreamSubscribers): void { + const buffer = streamState.reorderBuffer; + + while (buffer.pending.has(buffer.nextSeq)) { + const message = buffer.pending.get(buffer.nextSeq)!; + buffer.pending.delete(buffer.nextSeq); + this.deliverMessage(streamState, message); + buffer.nextSeq++; + } + + if (buffer.pending.size === 0 && buffer.flushTimeout) { + clearTimeout(buffer.flushTimeout); + buffer.flushTimeout = null; + } + } + + /** Force-flush all pending messages in order (used on timeout or overflow) */ + private forceFlushBuffer(streamId: string, streamState: StreamSubscribers): void { + const buffer = streamState.reorderBuffer; + + if (buffer.flushTimeout) { + clearTimeout(buffer.flushTimeout); + buffer.flushTimeout = null; + } + + if (buffer.pending.size === 0) { + return; + } + + const sortedSeqs = [...buffer.pending.keys()].sort((a, b) => a - b); + const skipped = sortedSeqs[0] - buffer.nextSeq; + + if (skipped > 0) { + logger.warn( + `[RedisEventTransport] Stream ${streamId}: skipping ${skipped} missing messages (seq ${buffer.nextSeq}-${sortedSeqs[0] - 1})`, + ); + } + + for (const seq of sortedSeqs) { + const message = buffer.pending.get(seq)!; + buffer.pending.delete(seq); + this.deliverMessage(streamState, message); + } + + buffer.nextSeq = sortedSeqs[sortedSeqs.length - 1] + 1; + } + + /** Schedule a timeout to force-flush if gaps aren't filled */ + private scheduleFlushTimeout(streamId: string, streamState: StreamSubscribers): void { + const buffer = streamState.reorderBuffer; + + if (buffer.flushTimeout) { + return; + } + + buffer.flushTimeout = setTimeout(() => { + buffer.flushTimeout = null; + if (buffer.pending.size > 0) { + logger.warn( + `[RedisEventTransport] Stream ${streamId}: timeout waiting for seq ${buffer.nextSeq}, force-flushing ${buffer.pending.size} messages`, + ); + this.forceFlushBuffer(streamId, streamState); + } + }, REORDER_TIMEOUT_MS); + } + + /** Deliver a message to all handlers */ + private deliverMessage(streamState: StreamSubscribers, message: PubSubMessage): void { + for (const [, handlers] of streamState.handlers) { + switch (message.type) { + case EventTypes.CHUNK: + handlers.onChunk(message.data); + break; + case EventTypes.DONE: + handlers.onDone?.(message.data); + break; + case EventTypes.ERROR: + handlers.onError?.(message.error ?? 'Unknown error'); + break; + case EventTypes.ABORT: + break; + } + } + + if (message.type === EventTypes.ABORT) { + for (const callback of streamState.abortCallbacks) { + try { + callback(); + } catch (err) { + logger.error(`[RedisEventTransport] Error in abort callback:`, err); + } + } + } + } + /** * Subscribe to events for a stream. * @@ -167,6 +342,11 @@ export class RedisEventTransport implements IEventTransport { handlers: new Map(), allSubscribersLeftCallbacks: [], abortCallbacks: [], + reorderBuffer: { + nextSeq: 0, + pending: new Map(), + flushTimeout: null, + }, }); } @@ -195,6 +375,13 @@ export class RedisEventTransport implements IEventTransport { // If last subscriber left, unsubscribe from Redis and notify if (state.count === 0) { + // Clear any pending flush timeout and buffered messages + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.pending.clear(); + this.subscriber.unsubscribe(channel).catch((err) => { logger.error(`[RedisEventTransport] Failed to unsubscribe from ${channel}:`, err); }); @@ -217,38 +404,50 @@ export class RedisEventTransport implements IEventTransport { /** * Publish a chunk event to all subscribers across all instances. + * Includes sequence number for ordered delivery in Redis Cluster mode. */ - emitChunk(streamId: string, event: unknown): void { + async emitChunk(streamId: string, event: unknown): Promise { const channel = CHANNELS.events(streamId); - const message: PubSubMessage = { type: EventTypes.CHUNK, data: event }; + const seq = this.getNextSequence(streamId); + const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event }; - this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + try { + await this.publisher.publish(channel, JSON.stringify(message)); + } catch (err) { logger.error(`[RedisEventTransport] Failed to publish chunk:`, err); - }); + } } /** * Publish a done event to all subscribers. + * Includes sequence number to ensure delivery after all chunks. */ - emitDone(streamId: string, event: unknown): void { + async emitDone(streamId: string, event: unknown): Promise { const channel = CHANNELS.events(streamId); - const message: PubSubMessage = { type: EventTypes.DONE, data: event }; + const seq = this.getNextSequence(streamId); + const message: PubSubMessage = { type: EventTypes.DONE, seq, data: event }; - this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + try { + await this.publisher.publish(channel, JSON.stringify(message)); + } catch (err) { logger.error(`[RedisEventTransport] Failed to publish done:`, err); - }); + } } /** * Publish an error event to all subscribers. + * Includes sequence number to ensure delivery after all chunks. */ - emitError(streamId: string, error: string): void { + async emitError(streamId: string, error: string): Promise { const channel = CHANNELS.events(streamId); - const message: PubSubMessage = { type: EventTypes.ERROR, error }; + const seq = this.getNextSequence(streamId); + const message: PubSubMessage = { type: EventTypes.ERROR, seq, error }; - this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + try { + await this.publisher.publish(channel, JSON.stringify(message)); + } catch (err) { logger.error(`[RedisEventTransport] Failed to publish error:`, err); - }); + } } /** @@ -282,6 +481,11 @@ export class RedisEventTransport implements IEventTransport { handlers: new Map(), allSubscribersLeftCallbacks: [callback], abortCallbacks: [], + reorderBuffer: { + nextSeq: 0, + pending: new Map(), + flushTimeout: null, + }, }); } } @@ -317,6 +521,11 @@ export class RedisEventTransport implements IEventTransport { handlers: new Map(), allSubscribersLeftCallbacks: [], abortCallbacks: [], + reorderBuffer: { + nextSeq: 0, + pending: new Map(), + flushTimeout: null, + }, }; this.streams.set(streamId, state); } @@ -347,12 +556,21 @@ export class RedisEventTransport implements IEventTransport { const state = this.streams.get(streamId); if (state) { + // Clear flush timeout + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } // Clear all handlers and callbacks state.handlers.clear(); state.allSubscribersLeftCallbacks = []; state.abortCallbacks = []; + state.reorderBuffer.pending.clear(); } + // Reset sequence counter for this stream + this.resetSequence(streamId); + // Unsubscribe from Redis channel if (this.subscribedChannels.has(channel)) { this.subscriber.unsubscribe(channel).catch((err) => { @@ -368,6 +586,15 @@ export class RedisEventTransport implements IEventTransport { * Destroy all resources. */ destroy(): void { + // Clear all flush timeouts and buffered messages + for (const [, state] of this.streams) { + if (state.reorderBuffer.flushTimeout) { + clearTimeout(state.reorderBuffer.flushTimeout); + state.reorderBuffer.flushTimeout = null; + } + state.reorderBuffer.pending.clear(); + } + // Unsubscribe from all channels for (const channel of this.subscribedChannels) { this.subscriber.unsubscribe(channel).catch(() => { @@ -377,6 +604,7 @@ export class RedisEventTransport implements IEventTransport { this.subscribedChannels.clear(); this.streams.clear(); + this.sequenceCounters.clear(); // Note: Don't close Redis connections - they may be shared logger.info('[RedisEventTransport] Destroyed'); diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index af681fb2e9..d990283925 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -296,14 +296,14 @@ export interface IEventTransport { }, ): { unsubscribe: () => void }; - /** Publish a chunk event */ - emitChunk(streamId: string, event: unknown): void; + /** Publish a chunk event - returns Promise in Redis mode for ordered delivery */ + emitChunk(streamId: string, event: unknown): void | Promise; - /** Publish a done event */ - emitDone(streamId: string, event: unknown): void; + /** Publish a done event - returns Promise in Redis mode for ordered delivery */ + emitDone(streamId: string, event: unknown): void | Promise; - /** Publish an error event */ - emitError(streamId: string, error: string): void; + /** Publish an error event - returns Promise in Redis mode for ordered delivery */ + emitError(streamId: string, error: string): void | Promise; /** * Publish an abort signal to all replicas (Redis mode). diff --git a/packages/api/src/tools/toolkits/imageContext.ts b/packages/api/src/tools/toolkits/imageContext.ts index 0485ed815a..723f173104 100644 --- a/packages/api/src/tools/toolkits/imageContext.ts +++ b/packages/api/src/tools/toolkits/imageContext.ts @@ -35,4 +35,3 @@ export function buildImageToolContext({ } return toolContext; } -