🔄 refactor: Sequential Event Ordering in Redis Streaming Mode (#11650)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions

* chore: linting image context file

* refactor: Event Emission with Async Handling for Redis Ordering

- Updated emitEvent and related functions to be async, ensuring proper event ordering in Redis mode.
- Refactored multiple handlers to await emitEvent calls, improving reliability for streaming deltas.
- Enhanced GenerationJobManager to await chunk emissions, critical for maintaining sequential event delivery.
- Added tests to verify that events are delivered in strict order when using Redis, addressing previous issues with out-of-order messages.

* refactor: Clear Pending Buffers and Timeouts in RedisEventTransport

- Enhanced the cleanup process in RedisEventTransport by ensuring that pending messages and flush timeouts are cleared when the last subscriber unsubscribes.
- Updated the destroy method to also clear pending messages and flush timeouts for all streams, improving resource management and preventing memory leaks.

* refactor: Update Event Emission to Async for Improved Ordering

- Refactored GenerationJobManager and RedisEventTransport to make emitDone and emitError methods async, ensuring proper event ordering in Redis mode.
- Updated all relevant calls to await these methods, enhancing reliability in event delivery.
- Adjusted tests to verify that events are processed in the correct sequence, addressing previous issues with out-of-order messages.

* refactor: Adjust RedisEventTransport for 0-Indexed Sequence Handling

- Updated sequence handling in RedisEventTransport to be 0-indexed, ensuring consistency across event emissions and buffer management.
- Modified integration tests to reflect the new sequence logic, improving the accuracy of event processing and delivery order.
- Enhanced comments for clarity on sequence management and terminal event handling.

* chore: Add Redis dump file to .gitignore

- Included dump.rdb in .gitignore to prevent accidental commits of Redis database dumps, enhancing repository cleanliness and security.

* test: Increase wait times in RedisEventTransport integration tests for CI stability

- Adjusted wait times for subscription establishment and event propagation from 100ms and 200ms to 500ms to improve reliability in CI environments.
- Enhanced code readability by formatting promise resolution lines for better clarity.
This commit is contained in:
Danny Avila 2026-02-05 17:57:33 +01:00 committed by GitHub
parent 46624798b6
commit feb72ad2dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1032 additions and 116 deletions

1
.gitignore vendored
View file

@ -15,6 +15,7 @@ pids
# CI/CD data
test-image*
dump.rdb
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

View file

@ -152,13 +152,15 @@ function checkIfLastAgent(last_agent_id, langgraph_node) {
/**
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
* In Redis mode, awaits the emit to guarantee event ordering (critical for streaming deltas).
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} eventData - The event data to send
* @returns {Promise<void>}
*/
function emitEvent(res, streamId, eventData) {
async function emitEvent(res, streamId, eventData) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
@ -206,18 +208,18 @@ function getDefaultHandlers({
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
handle: async (event, data, metadata) => {
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else {
const agentName = metadata?.name ?? 'Agent';
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
const action = isToolCall ? 'performing a task...' : 'thinking...';
emitEvent(res, streamId, {
await emitEvent(res, streamId, {
event: 'on_agent_update',
data: {
runId: metadata?.run_id,
@ -235,13 +237,13 @@ function getDefaultHandlers({
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
handle: async (event, data, metadata) => {
if (data?.delta.type === StepTypes.TOOL_CALLS) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -253,13 +255,13 @@ function getDefaultHandlers({
* @param {StreamEventData & { result: ToolEndData }} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
handle: async (event, data, metadata) => {
if (data?.result != null) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -271,11 +273,11 @@ function getDefaultHandlers({
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
handle: async (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -287,11 +289,11 @@ function getDefaultHandlers({
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
handle: async (event, data, metadata) => {
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
emitEvent(res, streamId, { event, data });
await emitEvent(res, streamId, { event, data });
}
aggregateContent({ event, data });
},
@ -307,6 +309,7 @@ function getDefaultHandlers({
/**
* Helper to write attachment events either to res or to job emitter.
* Note: Attachments are not order-sensitive like deltas, so fire-and-forget is acceptable.
* @param {ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} attachment - The attachment data

View file

@ -324,7 +324,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
conversationId: conversation?.conversationId,
});
GenerationJobManager.emitDone(streamId, finalEvent);
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId);
await decrementPendingRequest(userId);
} else {
@ -344,7 +344,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
conversationId: conversation?.conversationId,
});
GenerationJobManager.emitDone(streamId, finalEvent);
await GenerationJobManager.emitDone(streamId, finalEvent);
GenerationJobManager.completeJob(streamId, 'Request aborted');
await decrementPendingRequest(userId);
}
@ -377,7 +377,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
// abortJob already handled emitDone and completeJob
} else {
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
await GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
GenerationJobManager.completeJob(streamId, error.message);
}
@ -406,7 +406,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
res.status(500).json({ error: error.message || 'Failed to start generation' });
} else {
// JSON already sent, emit error to stream so client can receive it
GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation');
await GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation');
}
GenerationJobManager.completeJob(streamId, error.message);
await decrementPendingRequest(userId);

View file

@ -201,7 +201,7 @@ async function createActionTool({
async () => {
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
@ -231,7 +231,7 @@ async function createActionTool({
data.delta.expires_at = undefined;
const successEventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, successEventData);
await GenerationJobManager.emitChunk(streamId, successEventData);
} else {
sendEvent(res, successEventData);
}

View file

@ -53,9 +53,9 @@ function isEmptyObjectSchema(jsonSchema) {
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
/**
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
* @returns {void}
* @returns {Promise<void>}
*/
return function (authURL) {
return async function (authURL) {
/** @type {{ id: string; delta: AgentToolCallDelta }} */
const data = {
id: stepId,
@ -68,7 +68,7 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
};
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
@ -83,9 +83,10 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {number} [params.index]
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
* @returns {() => Promise<void>}
*/
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
return function () {
return async function () {
/** @type {import('@librechat/agents').RunStep} */
const data = {
runId: runId ?? Constants.USE_PRELIM_RESPONSE_MESSAGE_ID,
@ -99,7 +100,7 @@ function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId =
};
const eventData = { event: GraphEvents.ON_RUN_STEP, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
@ -147,7 +148,7 @@ function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
};
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
await GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}

View file

@ -526,8 +526,8 @@ async function loadToolDefinitionsWrapper({ req, res, agent, streamId = null, to
const runStepDeltaEvent = { event: GraphEvents.ON_RUN_STEP_DELTA, data: runStepDeltaData };
if (streamId) {
GenerationJobManager.emitChunk(streamId, runStepEvent);
GenerationJobManager.emitChunk(streamId, runStepDeltaEvent);
await GenerationJobManager.emitChunk(streamId, runStepEvent);
await GenerationJobManager.emitChunk(streamId, runStepDeltaEvent);
} else if (res && !res.writableEnded) {
sendEvent(res, runStepEvent);
sendEvent(res, runStepDeltaEvent);

View file

@ -662,7 +662,7 @@ class GenerationJobManagerClass {
runtime.finalEvent = abortFinalEvent;
}
this.eventTransport.emitDone(streamId, abortFinalEvent);
await this.eventTransport.emitDone(streamId, abortFinalEvent);
this.jobStore.clearContentState(streamId);
this.runStepBuffers?.delete(streamId);
@ -789,8 +789,11 @@ class GenerationJobManagerClass {
*
* If no subscriber has connected yet, buffers the event for replay when they do.
* This ensures early events (like 'created') aren't lost due to race conditions.
*
* In Redis mode, awaits the publish to guarantee event ordering.
* This is critical for streaming deltas (tool args, message content) to arrive in order.
*/
emitChunk(streamId: string, event: t.ServerSentEvent): void {
async emitChunk(streamId: string, event: t.ServerSentEvent): Promise<void> {
const runtime = this.runtimeState.get(streamId);
if (!runtime || runtime.abortController.signal.aborted) {
return;
@ -799,7 +802,7 @@ class GenerationJobManagerClass {
// Track user message from created event
this.trackUserMessage(streamId, event);
// For Redis mode, persist chunk for later reconstruction
// For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability)
if (this._isRedis) {
// The SSE event structure is { event: string, data: unknown, ... }
// The aggregator expects { event: string, data: unknown } where data is the payload
@ -825,7 +828,8 @@ class GenerationJobManagerClass {
runtime.earlyEventBuffer.push(event);
}
this.eventTransport.emitChunk(streamId, event);
// Await the transport emit - critical for Redis mode to maintain event order
await this.eventTransport.emitChunk(streamId, event);
}
/**
@ -1035,7 +1039,7 @@ class GenerationJobManagerClass {
* Emit a done event.
* Persists finalEvent to Redis for cross-replica access.
*/
emitDone(streamId: string, event: t.ServerSentEvent): void {
async emitDone(streamId: string, event: t.ServerSentEvent): Promise<void> {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.finalEvent = event;
@ -1044,7 +1048,7 @@ class GenerationJobManagerClass {
this.jobStore.updateJob(streamId, { finalEvent: JSON.stringify(event) }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist finalEvent:`, err);
});
this.eventTransport.emitDone(streamId, event);
await this.eventTransport.emitDone(streamId, event);
}
/**
@ -1052,7 +1056,7 @@ class GenerationJobManagerClass {
* Stores the error for late-connecting subscribers (race condition where error
* occurs before client connects to SSE stream).
*/
emitError(streamId: string, error: string): void {
async emitError(streamId: string, error: string): Promise<void> {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.errorEvent = error;
@ -1061,7 +1065,7 @@ class GenerationJobManagerClass {
this.jobStore.updateJob(streamId, { error }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist error:`, err);
});
this.eventTransport.emitError(streamId, error);
await this.eventTransport.emitError(streamId, error);
}
/**

View file

@ -134,12 +134,12 @@ describe('GenerationJobManager Integration Tests', () => {
// Wait for first subscriber to be registered
await new Promise((resolve) => setTimeout(resolve, 10));
// Emit chunks (emitChunk takes { event, data } format)
GenerationJobManager.emitChunk(streamId, {
// Emit chunks (emitChunk takes { event, data } format, now async for Redis ordering)
await GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: { type: 'text', text: 'Hello' },
});
GenerationJobManager.emitChunk(streamId, {
await GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: { type: 'text', text: ' world' },
});
@ -219,8 +219,8 @@ describe('GenerationJobManager Integration Tests', () => {
await GenerationJobManager.createJob(streamId, 'user-1');
// Emit chunks (these should be persisted to Redis)
// emitChunk takes { event, data } format
GenerationJobManager.emitChunk(streamId, {
// emitChunk takes { event, data } format, now async for Redis ordering
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step',
data: {
id: 'step-1',
@ -229,14 +229,14 @@ describe('GenerationJobManager Integration Tests', () => {
stepDetails: { type: 'message_creation' },
},
});
GenerationJobManager.emitChunk(streamId, {
await GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
delta: { content: { type: 'text', text: 'Persisted ' } },
},
});
GenerationJobManager.emitChunk(streamId, {
await GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
@ -276,8 +276,8 @@ describe('GenerationJobManager Integration Tests', () => {
const streamId = `redis-abort-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Emit some content (emitChunk takes { event, data } format)
GenerationJobManager.emitChunk(streamId, {
// Emit some content (emitChunk takes { event, data } format, now async)
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step',
data: {
id: 'step-1',
@ -286,7 +286,7 @@ describe('GenerationJobManager Integration Tests', () => {
stepDetails: { type: 'message_creation' },
},
});
GenerationJobManager.emitChunk(streamId, {
await GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
@ -582,7 +582,7 @@ describe('GenerationJobManager Integration Tests', () => {
conversation: { conversationId: streamId },
responseMessage: { text: 'Hello world' },
};
GenerationJobManager.emitDone(streamId, finalEventData as never);
await GenerationJobManager.emitDone(streamId, finalEventData as never);
await new Promise((resolve) => setTimeout(resolve, 200));
@ -796,6 +796,267 @@ describe('GenerationJobManager Integration Tests', () => {
});
});
describe('Sequential Event Ordering (Redis)', () => {
/**
* These tests verify that events are delivered in strict sequential order
* when using Redis mode. This is critical because:
* 1. LLM streaming tokens must arrive in order for coherent output
* 2. Tool call argument deltas must be concatenated in order
* 3. Run step events must precede their deltas
*
* The fix: emitChunk now awaits Redis publish to ensure ordered delivery.
*/
test('should maintain strict order for rapid sequential emits', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `order-rapid-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const receivedIndices: number[] = [];
const subscription = await GenerationJobManager.subscribe(streamId, (event) => {
const data = event as { event: string; data: { index: number } };
if (data.event === 'test') {
receivedIndices.push(data.data.index);
}
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit 30 events rapidly - with await, they must arrive in order
for (let i = 0; i < 30; i++) {
await GenerationJobManager.emitChunk(streamId, {
event: 'test',
data: { index: i },
});
}
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify all events arrived in correct order
expect(receivedIndices.length).toBe(30);
for (let i = 0; i < 30; i++) {
expect(receivedIndices[i]).toBe(i);
}
subscription?.unsubscribe();
await GenerationJobManager.destroy();
});
test('should maintain order for tool call argument deltas', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `tool-args-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const receivedArgs: string[] = [];
const subscription = await GenerationJobManager.subscribe(streamId, (event) => {
const data = event as {
event: string;
data: { delta: { tool_calls: { args: string }[] } };
};
if (data.event === 'on_run_step_delta') {
receivedArgs.push(data.data.delta.tool_calls[0].args);
}
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Simulate streaming JSON args: {"code": "print('hello')"}
const argChunks = ['{"', 'code', '": "', 'print', "('", 'hello', "')", '"}'];
for (const chunk of argChunks) {
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step_delta',
data: {
id: 'step-1',
delta: {
type: 'tool_calls',
tool_calls: [{ index: 0, args: chunk }],
},
},
});
}
await new Promise((resolve) => setTimeout(resolve, 300));
// This was the original bug - args would arrive scrambled without await
expect(receivedArgs).toEqual(argChunks);
expect(receivedArgs.join('')).toBe(`{"code": "print('hello')"}`);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
});
test('should maintain order: on_run_step before on_run_step_delta', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `step-order-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const receivedEvents: string[] = [];
const subscription = await GenerationJobManager.subscribe(streamId, (event) => {
const data = event as { event: string };
receivedEvents.push(data.event);
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit in correct order: step first, then deltas
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', type: 'tool_calls', index: 0 },
});
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step_delta',
data: { id: 'step-1', delta: { type: 'tool_calls', tool_calls: [{ args: '{' }] } },
});
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step_delta',
data: { id: 'step-1', delta: { type: 'tool_calls', tool_calls: [{ args: '}' }] } },
});
await GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step_completed',
data: { id: 'step-1', result: { content: '{}' } },
});
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify ordering: step -> deltas -> completed
expect(receivedEvents).toEqual([
'on_run_step',
'on_run_step_delta',
'on_run_step_delta',
'on_run_step_completed',
]);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
});
test('should not block other streams when awaiting emitChunk', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId1 = `concurrent-1-${Date.now()}`;
const streamId2 = `concurrent-2-${Date.now()}`;
await GenerationJobManager.createJob(streamId1, 'user-1');
await GenerationJobManager.createJob(streamId2, 'user-2');
const stream1Events: number[] = [];
const stream2Events: number[] = [];
const sub1 = await GenerationJobManager.subscribe(streamId1, (event) => {
const data = event as { event: string; data: { index: number } };
if (data.event === 'test') {
stream1Events.push(data.data.index);
}
});
const sub2 = await GenerationJobManager.subscribe(streamId2, (event) => {
const data = event as { event: string; data: { index: number } };
if (data.event === 'test') {
stream2Events.push(data.data.index);
}
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit to both streams concurrently (simulating two LLM responses)
const emitPromises: Promise<void>[] = [];
for (let i = 0; i < 10; i++) {
emitPromises.push(
GenerationJobManager.emitChunk(streamId1, { event: 'test', data: { index: i } }),
);
emitPromises.push(
GenerationJobManager.emitChunk(streamId2, { event: 'test', data: { index: i * 100 } }),
);
}
await Promise.all(emitPromises);
await new Promise((resolve) => setTimeout(resolve, 300));
// Each stream should have all events, in order within their stream
expect(stream1Events.length).toBe(10);
expect(stream2Events.length).toBe(10);
// Verify each stream's internal order
for (let i = 0; i < 10; i++) {
expect(stream1Events[i]).toBe(i);
expect(stream2Events[i]).toBe(i * 100);
}
sub1?.unsubscribe();
sub2?.unsubscribe();
await GenerationJobManager.destroy();
});
});
describe('Error Preservation for Late Subscribers', () => {
/**
* These tests verify the fix for the race condition where errors
@ -825,7 +1086,7 @@ describe('GenerationJobManager Integration Tests', () => {
const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }';
// Emit error (no subscribers yet - simulates race condition)
GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.emitError(streamId, errorMessage);
// Wait for async job store update
await new Promise((resolve) => setTimeout(resolve, 50));
@ -891,7 +1152,7 @@ describe('GenerationJobManager Integration Tests', () => {
const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }';
// Simulate race condition: error occurs before client connects
GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.completeJob(streamId, errorMessage);
// Wait for async operations
@ -940,7 +1201,7 @@ describe('GenerationJobManager Integration Tests', () => {
const errorMessage = 'Error should take priority';
// Emit error and complete with error
GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.completeJob(streamId, errorMessage);
await new Promise((resolve) => setTimeout(resolve, 50));

View file

@ -70,16 +70,16 @@ describe('RedisEventTransport Integration Tests', () => {
},
});
// Wait for subscription to be established
await new Promise((resolve) => setTimeout(resolve, 100));
// Wait for subscription to be established (increased for CI)
await new Promise((resolve) => setTimeout(resolve, 500));
// Emit events
transport.emitChunk(streamId, { type: 'text', text: 'Hello' });
transport.emitChunk(streamId, { type: 'text', text: ' World' });
transport.emitDone(streamId, { finished: true });
// Emit events (emitChunk/emitDone are async for ordered delivery)
await transport.emitChunk(streamId, { type: 'text', text: 'Hello' });
await transport.emitChunk(streamId, { type: 'text', text: ' World' });
await transport.emitDone(streamId, { finished: true });
// Wait for events to propagate
await new Promise((resolve) => setTimeout(resolve, 200));
// Wait for events to propagate (increased for CI)
await new Promise((resolve) => setTimeout(resolve, 500));
expect(receivedChunks.length).toBe(2);
expect(doneEvent).toEqual({ finished: true });
@ -117,7 +117,7 @@ describe('RedisEventTransport Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit from transport 1 (producer on different instance)
transport1.emitChunk(streamId, { data: 'from-instance-1' });
await transport1.emitChunk(streamId, { data: 'from-instance-1' });
// Wait for cross-instance delivery
await new Promise((resolve) => setTimeout(resolve, 200));
@ -160,7 +160,7 @@ describe('RedisEventTransport Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 100));
transport.emitChunk(streamId, { data: 'broadcast' });
await transport.emitChunk(streamId, { data: 'broadcast' });
await new Promise((resolve) => setTimeout(resolve, 200));
@ -175,6 +175,425 @@ describe('RedisEventTransport Integration Tests', () => {
});
});
describe('Sequential Event Ordering', () => {
test('should maintain strict order when emitChunk is awaited', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `order-test-${Date.now()}`;
const receivedEvents: number[] = [];
transport.subscribe(streamId, {
onChunk: (event) => receivedEvents.push((event as { index: number }).index),
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit 20 events rapidly with await - they should arrive in order
for (let i = 0; i < 20; i++) {
await transport.emitChunk(streamId, { index: i });
}
// Wait for all events to propagate
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify all events arrived in correct order
expect(receivedEvents.length).toBe(20);
for (let i = 0; i < 20; i++) {
expect(receivedEvents[i]).toBe(i);
}
transport.destroy();
subscriber.disconnect();
});
test('should maintain order for tool call delta chunks (simulates streaming args)', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `tool-delta-order-${Date.now()}`;
const receivedArgs: string[] = [];
transport.subscribe(streamId, {
onChunk: (event) => {
const data = event as {
event: string;
data: { delta: { tool_calls: { args: string }[] } };
};
if (data.event === 'on_run_step_delta') {
receivedArgs.push(data.data.delta.tool_calls[0].args);
}
},
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Simulate streaming tool call arguments like: {"code": "# First line\n..."
const argChunks = ['{"code"', ': "', '# First', ' line', '\\n', '..."', '}'];
for (const chunk of argChunks) {
await transport.emitChunk(streamId, {
event: 'on_run_step_delta',
data: {
id: 'step-1',
delta: {
type: 'tool_calls',
tool_calls: [{ index: 0, args: chunk }],
},
},
});
}
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify chunks arrived in correct order - this was the bug we fixed
expect(receivedArgs).toEqual(argChunks);
expect(receivedArgs.join('')).toBe('{"code": "# First line\\n..."}');
transport.destroy();
subscriber.disconnect();
});
test('should maintain order across multiple concurrent streams (no cross-contamination)', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId1 = `concurrent-stream-1-${Date.now()}`;
const streamId2 = `concurrent-stream-2-${Date.now()}`;
const stream1Events: number[] = [];
const stream2Events: number[] = [];
transport.subscribe(streamId1, {
onChunk: (event) => stream1Events.push((event as { index: number }).index),
});
transport.subscribe(streamId2, {
onChunk: (event) => stream2Events.push((event as { index: number }).index),
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Interleave events from both streams
for (let i = 0; i < 10; i++) {
await transport.emitChunk(streamId1, { index: i });
await transport.emitChunk(streamId2, { index: i * 10 });
}
await new Promise((resolve) => setTimeout(resolve, 300));
// Each stream should have its own ordered events
expect(stream1Events).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
expect(stream2Events).toEqual([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]);
transport.destroy();
subscriber.disconnect();
});
});
describe('Reorder Buffer (Redis Cluster Fix)', () => {
test('should reorder out-of-sequence messages', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'reorder-test';
const receivedEvents: number[] = [];
transport.subscribe(streamId, {
onChunk: (event) => receivedEvents.push((event as { index: number }).index),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { index: 0 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } }));
await new Promise((resolve) => setTimeout(resolve, 50));
expect(receivedEvents).toEqual([0, 1, 2]);
transport.destroy();
});
test('should buffer early messages and deliver when gaps are filled', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'buffer-test';
const receivedEvents: number[] = [];
transport.subscribe(streamId, {
onChunk: (event) => receivedEvents.push((event as { index: number }).index),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 4, data: { index: 4 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 3, data: { index: 3 } }));
await new Promise((resolve) => setTimeout(resolve, 50));
expect(receivedEvents).toEqual([]);
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { index: 0 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { index: 1 } }));
await new Promise((resolve) => setTimeout(resolve, 50));
expect(receivedEvents).toEqual([0, 1, 2, 3, 4]);
transport.destroy();
});
test('should force-flush on timeout when gaps are not filled', async () => {
jest.useFakeTimers();
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'timeout-test';
const receivedEvents: number[] = [];
transport.subscribe(streamId, {
onChunk: (event) => receivedEvents.push((event as { index: number }).index),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { index: 2 } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 3, data: { index: 3 } }));
expect(receivedEvents).toEqual([]);
jest.advanceTimersByTime(600);
expect(receivedEvents).toEqual([2, 3]);
transport.destroy();
jest.useRealTimers();
});
test('should handle messages without sequence numbers (backward compatibility)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = 'compat-test';
const receivedEvents: string[] = [];
transport.subscribe(streamId, {
onChunk: (event) => receivedEvents.push((event as { msg: string }).msg),
onDone: (event) => receivedEvents.push(`done:${(event as { msg: string }).msg}`),
});
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1] as (channel: string, message: string) => void;
const channel = `stream:{${streamId}}:events`;
messageHandler(channel, JSON.stringify({ type: 'chunk', data: { msg: 'no-seq-1' } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', data: { msg: 'no-seq-2' } }));
messageHandler(channel, JSON.stringify({ type: 'done', data: { msg: 'finished' } }));
await new Promise((resolve) => setTimeout(resolve, 50));
expect(receivedEvents).toEqual(['no-seq-1', 'no-seq-2', 'done:finished']);
transport.destroy();
});
test('should deliver done event after all pending chunks (terminal event ordering)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
on: jest.fn(),
};
const transport = new RedisEventTransport(mockPublisher as never, mockSubscriber as never);
const streamId = `terminal-order-${Date.now()}`;
const receivedEvents: string[] = [];
let doneReceived = false;
transport.subscribe(streamId, {
onChunk: (event: unknown) => {
const e = event as { msg?: string };
receivedEvents.push(e.msg ?? 'unknown');
},
onDone: (event: unknown) => {
const e = event as { msg?: string };
receivedEvents.push(`done:${e.msg ?? 'finished'}`);
doneReceived = true;
},
});
await new Promise((resolve) => setTimeout(resolve, 50));
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1];
expect(messageHandler).toBeDefined();
const channel = `stream:{${streamId}}:events`;
// Simulate out-of-order delivery in Redis Cluster:
// Done event (seq=3) arrives before chunk seq=2
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { msg: 'chunk-0' } }));
messageHandler(channel, JSON.stringify({ type: 'done', seq: 3, data: { msg: 'complete' } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 2, data: { msg: 'chunk-2' } }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { msg: 'chunk-1' } }));
await new Promise((resolve) => setTimeout(resolve, 100));
// Done event should be delivered AFTER all chunks despite arriving early
expect(doneReceived).toBe(true);
expect(receivedEvents).toEqual(['chunk-0', 'chunk-1', 'chunk-2', 'done:complete']);
transport.destroy();
});
test('should deliver error event after all pending chunks (terminal event ordering)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockResolvedValue(1),
};
const mockSubscriber = {
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
on: jest.fn(),
};
const transport = new RedisEventTransport(mockPublisher as never, mockSubscriber as never);
const streamId = `terminal-error-${Date.now()}`;
const receivedEvents: string[] = [];
let errorReceived: string | undefined;
transport.subscribe(streamId, {
onChunk: (event: unknown) => {
const e = event as { msg?: string };
receivedEvents.push(e.msg ?? 'unknown');
},
onError: (error: string) => {
receivedEvents.push(`error:${error}`);
errorReceived = error;
},
});
await new Promise((resolve) => setTimeout(resolve, 50));
const messageHandler = mockSubscriber.on.mock.calls.find(
(call) => call[0] === 'message',
)?.[1];
expect(messageHandler).toBeDefined();
const channel = `stream:{${streamId}}:events`;
// Simulate out-of-order delivery: error arrives before final chunks
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 0, data: { msg: 'chunk-0' } }));
messageHandler(channel, JSON.stringify({ type: 'error', seq: 2, error: 'Something failed' }));
messageHandler(channel, JSON.stringify({ type: 'chunk', seq: 1, data: { msg: 'chunk-1' } }));
await new Promise((resolve) => setTimeout(resolve, 100));
// Error event should be delivered AFTER all preceding chunks
expect(errorReceived).toBe('Something failed');
expect(receivedEvents).toEqual(['chunk-0', 'chunk-1', 'error:Something failed']);
transport.destroy();
});
});
describe('Subscriber Management', () => {
test('should track first subscriber correctly', async () => {
if (!ioredisClient) {
@ -283,7 +702,7 @@ describe('RedisEventTransport Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 100));
transport.emitError(streamId, 'Test error message');
await transport.emitError(streamId, 'Test error message');
await new Promise((resolve) => setTimeout(resolve, 200));

View file

@ -22,10 +22,30 @@ const EventTypes = {
interface PubSubMessage {
type: (typeof EventTypes)[keyof typeof EventTypes];
/** Sequence number for ordering (critical for Redis Cluster) */
seq?: number;
data?: unknown;
error?: string;
}
/**
* Reorder buffer state for a stream subscription.
* Handles out-of-order message delivery in Redis Cluster mode.
*/
interface ReorderBuffer {
/** Next expected sequence number */
nextSeq: number;
/** Buffered messages waiting for earlier sequences */
pending: Map<number, PubSubMessage>;
/** Timeout handle for flushing stale messages */
flushTimeout: ReturnType<typeof setTimeout> | null;
}
/** Max time (ms) to wait for out-of-order messages before force-flushing */
const REORDER_TIMEOUT_MS = 500;
/** Max messages to buffer before force-flushing (prevents memory issues) */
const MAX_BUFFER_SIZE = 100;
/**
* Subscriber state for a stream
*/
@ -42,6 +62,8 @@ interface StreamSubscribers {
allSubscribersLeftCallbacks: Array<() => void>;
/** Abort callbacks - called when abort signal is received from any replica */
abortCallbacks: Array<() => void>;
/** Reorder buffer for handling out-of-order delivery in Redis Cluster */
reorderBuffer: ReorderBuffer;
}
/**
@ -74,6 +96,8 @@ export class RedisEventTransport implements IEventTransport {
private subscribedChannels = new Set<string>();
/** Counter for generating unique subscriber IDs */
private subscriberIdCounter = 0;
/** Sequence counters per stream for publishing (ensures ordered delivery in cluster mode) */
private sequenceCounters = new Map<string, number>();
/**
* Create a new Redis event transport.
@ -91,12 +115,22 @@ export class RedisEventTransport implements IEventTransport {
});
}
/** Get next sequence number for a stream (0-indexed) */
private getNextSequence(streamId: string): number {
const current = this.sequenceCounters.get(streamId) ?? 0;
this.sequenceCounters.set(streamId, current + 1);
return current;
}
/** Reset sequence counter for a stream */
private resetSequence(streamId: string): void {
this.sequenceCounters.delete(streamId);
}
/**
* Handle incoming pub/sub message
* Handle incoming pub/sub message with reordering support for Redis Cluster
*/
private handleMessage(channel: string, message: string): void {
// Extract streamId from channel name: stream:{streamId}:events
// Use regex to extract the hash tag content
const match = channel.match(/^stream:\{([^}]+)\}:events$/);
if (!match) {
return;
@ -111,38 +145,179 @@ export class RedisEventTransport implements IEventTransport {
try {
const parsed = JSON.parse(message) as PubSubMessage;
for (const [, handlers] of streamState.handlers) {
switch (parsed.type) {
case EventTypes.CHUNK:
handlers.onChunk(parsed.data);
break;
case EventTypes.DONE:
handlers.onDone?.(parsed.data);
break;
case EventTypes.ERROR:
handlers.onError?.(parsed.error ?? 'Unknown error');
break;
case EventTypes.ABORT:
// Abort is handled at stream level, not per-handler
break;
}
}
// Handle abort signals at stream level (not per-handler)
if (parsed.type === EventTypes.ABORT) {
for (const callback of streamState.abortCallbacks) {
try {
callback();
} catch (err) {
logger.error(`[RedisEventTransport] Error in abort callback:`, err);
}
}
if (parsed.type === EventTypes.CHUNK && parsed.seq != null) {
this.handleOrderedChunk(streamId, streamState, parsed);
} else if (
(parsed.type === EventTypes.DONE || parsed.type === EventTypes.ERROR) &&
parsed.seq != null
) {
this.handleTerminalEvent(streamId, streamState, parsed);
} else {
this.deliverMessage(streamState, parsed);
}
} catch (err) {
logger.error(`[RedisEventTransport] Failed to parse message:`, err);
}
}
/**
* Handle terminal events (done/error) with sequence-based ordering.
* Buffers the terminal event and delivers after all preceding chunks arrive.
*/
private handleTerminalEvent(
streamId: string,
streamState: StreamSubscribers,
message: PubSubMessage,
): void {
const buffer = streamState.reorderBuffer;
const seq = message.seq!;
if (seq < buffer.nextSeq) {
logger.debug(
`[RedisEventTransport] Dropping duplicate terminal event for stream ${streamId}: seq=${seq}, expected=${buffer.nextSeq}`,
);
return;
}
if (seq === buffer.nextSeq) {
this.deliverMessage(streamState, message);
buffer.nextSeq++;
this.flushPendingMessages(streamId, streamState);
} else {
buffer.pending.set(seq, message);
this.scheduleFlushTimeout(streamId, streamState);
}
}
/**
* Handle chunk messages with sequence-based reordering.
* Buffers out-of-order messages and delivers them in sequence.
*/
private handleOrderedChunk(
streamId: string,
streamState: StreamSubscribers,
message: PubSubMessage,
): void {
const buffer = streamState.reorderBuffer;
const seq = message.seq!;
if (seq === buffer.nextSeq) {
this.deliverMessage(streamState, message);
buffer.nextSeq++;
this.flushPendingMessages(streamId, streamState);
} else if (seq > buffer.nextSeq) {
buffer.pending.set(seq, message);
if (buffer.pending.size >= MAX_BUFFER_SIZE) {
logger.warn(`[RedisEventTransport] Buffer overflow for stream ${streamId}, force-flushing`);
this.forceFlushBuffer(streamId, streamState);
} else {
this.scheduleFlushTimeout(streamId, streamState);
}
} else {
logger.debug(
`[RedisEventTransport] Dropping duplicate/old message for stream ${streamId}: seq=${seq}, expected=${buffer.nextSeq}`,
);
}
}
/** Deliver consecutive pending messages */
private flushPendingMessages(streamId: string, streamState: StreamSubscribers): void {
const buffer = streamState.reorderBuffer;
while (buffer.pending.has(buffer.nextSeq)) {
const message = buffer.pending.get(buffer.nextSeq)!;
buffer.pending.delete(buffer.nextSeq);
this.deliverMessage(streamState, message);
buffer.nextSeq++;
}
if (buffer.pending.size === 0 && buffer.flushTimeout) {
clearTimeout(buffer.flushTimeout);
buffer.flushTimeout = null;
}
}
/** Force-flush all pending messages in order (used on timeout or overflow) */
private forceFlushBuffer(streamId: string, streamState: StreamSubscribers): void {
const buffer = streamState.reorderBuffer;
if (buffer.flushTimeout) {
clearTimeout(buffer.flushTimeout);
buffer.flushTimeout = null;
}
if (buffer.pending.size === 0) {
return;
}
const sortedSeqs = [...buffer.pending.keys()].sort((a, b) => a - b);
const skipped = sortedSeqs[0] - buffer.nextSeq;
if (skipped > 0) {
logger.warn(
`[RedisEventTransport] Stream ${streamId}: skipping ${skipped} missing messages (seq ${buffer.nextSeq}-${sortedSeqs[0] - 1})`,
);
}
for (const seq of sortedSeqs) {
const message = buffer.pending.get(seq)!;
buffer.pending.delete(seq);
this.deliverMessage(streamState, message);
}
buffer.nextSeq = sortedSeqs[sortedSeqs.length - 1] + 1;
}
/** Schedule a timeout to force-flush if gaps aren't filled */
private scheduleFlushTimeout(streamId: string, streamState: StreamSubscribers): void {
const buffer = streamState.reorderBuffer;
if (buffer.flushTimeout) {
return;
}
buffer.flushTimeout = setTimeout(() => {
buffer.flushTimeout = null;
if (buffer.pending.size > 0) {
logger.warn(
`[RedisEventTransport] Stream ${streamId}: timeout waiting for seq ${buffer.nextSeq}, force-flushing ${buffer.pending.size} messages`,
);
this.forceFlushBuffer(streamId, streamState);
}
}, REORDER_TIMEOUT_MS);
}
/** Deliver a message to all handlers */
private deliverMessage(streamState: StreamSubscribers, message: PubSubMessage): void {
for (const [, handlers] of streamState.handlers) {
switch (message.type) {
case EventTypes.CHUNK:
handlers.onChunk(message.data);
break;
case EventTypes.DONE:
handlers.onDone?.(message.data);
break;
case EventTypes.ERROR:
handlers.onError?.(message.error ?? 'Unknown error');
break;
case EventTypes.ABORT:
break;
}
}
if (message.type === EventTypes.ABORT) {
for (const callback of streamState.abortCallbacks) {
try {
callback();
} catch (err) {
logger.error(`[RedisEventTransport] Error in abort callback:`, err);
}
}
}
}
/**
* Subscribe to events for a stream.
*
@ -167,6 +342,11 @@ export class RedisEventTransport implements IEventTransport {
handlers: new Map(),
allSubscribersLeftCallbacks: [],
abortCallbacks: [],
reorderBuffer: {
nextSeq: 0,
pending: new Map(),
flushTimeout: null,
},
});
}
@ -195,6 +375,13 @@ export class RedisEventTransport implements IEventTransport {
// If last subscriber left, unsubscribe from Redis and notify
if (state.count === 0) {
// Clear any pending flush timeout and buffered messages
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
state.reorderBuffer.flushTimeout = null;
}
state.reorderBuffer.pending.clear();
this.subscriber.unsubscribe(channel).catch((err) => {
logger.error(`[RedisEventTransport] Failed to unsubscribe from ${channel}:`, err);
});
@ -217,38 +404,50 @@ export class RedisEventTransport implements IEventTransport {
/**
* Publish a chunk event to all subscribers across all instances.
* Includes sequence number for ordered delivery in Redis Cluster mode.
*/
emitChunk(streamId: string, event: unknown): void {
async emitChunk(streamId: string, event: unknown): Promise<void> {
const channel = CHANNELS.events(streamId);
const message: PubSubMessage = { type: EventTypes.CHUNK, data: event };
const seq = this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.CHUNK, seq, data: event };
this.publisher.publish(channel, JSON.stringify(message)).catch((err) => {
try {
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish chunk:`, err);
});
}
}
/**
* Publish a done event to all subscribers.
* Includes sequence number to ensure delivery after all chunks.
*/
emitDone(streamId: string, event: unknown): void {
async emitDone(streamId: string, event: unknown): Promise<void> {
const channel = CHANNELS.events(streamId);
const message: PubSubMessage = { type: EventTypes.DONE, data: event };
const seq = this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.DONE, seq, data: event };
this.publisher.publish(channel, JSON.stringify(message)).catch((err) => {
try {
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish done:`, err);
});
}
}
/**
* Publish an error event to all subscribers.
* Includes sequence number to ensure delivery after all chunks.
*/
emitError(streamId: string, error: string): void {
async emitError(streamId: string, error: string): Promise<void> {
const channel = CHANNELS.events(streamId);
const message: PubSubMessage = { type: EventTypes.ERROR, error };
const seq = this.getNextSequence(streamId);
const message: PubSubMessage = { type: EventTypes.ERROR, seq, error };
this.publisher.publish(channel, JSON.stringify(message)).catch((err) => {
try {
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish error:`, err);
});
}
}
/**
@ -282,6 +481,11 @@ export class RedisEventTransport implements IEventTransport {
handlers: new Map(),
allSubscribersLeftCallbacks: [callback],
abortCallbacks: [],
reorderBuffer: {
nextSeq: 0,
pending: new Map(),
flushTimeout: null,
},
});
}
}
@ -317,6 +521,11 @@ export class RedisEventTransport implements IEventTransport {
handlers: new Map(),
allSubscribersLeftCallbacks: [],
abortCallbacks: [],
reorderBuffer: {
nextSeq: 0,
pending: new Map(),
flushTimeout: null,
},
};
this.streams.set(streamId, state);
}
@ -347,12 +556,21 @@ export class RedisEventTransport implements IEventTransport {
const state = this.streams.get(streamId);
if (state) {
// Clear flush timeout
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
state.reorderBuffer.flushTimeout = null;
}
// Clear all handlers and callbacks
state.handlers.clear();
state.allSubscribersLeftCallbacks = [];
state.abortCallbacks = [];
state.reorderBuffer.pending.clear();
}
// Reset sequence counter for this stream
this.resetSequence(streamId);
// Unsubscribe from Redis channel
if (this.subscribedChannels.has(channel)) {
this.subscriber.unsubscribe(channel).catch((err) => {
@ -368,6 +586,15 @@ export class RedisEventTransport implements IEventTransport {
* Destroy all resources.
*/
destroy(): void {
// Clear all flush timeouts and buffered messages
for (const [, state] of this.streams) {
if (state.reorderBuffer.flushTimeout) {
clearTimeout(state.reorderBuffer.flushTimeout);
state.reorderBuffer.flushTimeout = null;
}
state.reorderBuffer.pending.clear();
}
// Unsubscribe from all channels
for (const channel of this.subscribedChannels) {
this.subscriber.unsubscribe(channel).catch(() => {
@ -377,6 +604,7 @@ export class RedisEventTransport implements IEventTransport {
this.subscribedChannels.clear();
this.streams.clear();
this.sequenceCounters.clear();
// Note: Don't close Redis connections - they may be shared
logger.info('[RedisEventTransport] Destroyed');

View file

@ -296,14 +296,14 @@ export interface IEventTransport {
},
): { unsubscribe: () => void };
/** Publish a chunk event */
emitChunk(streamId: string, event: unknown): void;
/** Publish a chunk event - returns Promise in Redis mode for ordered delivery */
emitChunk(streamId: string, event: unknown): void | Promise<void>;
/** Publish a done event */
emitDone(streamId: string, event: unknown): void;
/** Publish a done event - returns Promise in Redis mode for ordered delivery */
emitDone(streamId: string, event: unknown): void | Promise<void>;
/** Publish an error event */
emitError(streamId: string, error: string): void;
/** Publish an error event - returns Promise in Redis mode for ordered delivery */
emitError(streamId: string, error: string): void | Promise<void>;
/**
* Publish an abort signal to all replicas (Redis mode).

View file

@ -35,4 +35,3 @@ export function buildImageToolContext({
}
return toolContext;
}