From a01959b3d2eddb0961c611d429a703187d2e347b Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Sun, 15 Mar 2026 11:11:10 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9B=B0=EF=B8=8F=20fix:=20Cross-Replica=20?= =?UTF-8?q?Created=20Event=20Delivery=20(#12231)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: emit created event from metadata on cross-replica subscribe In multi-instance Redis deployments, the created event (which triggers sidebar conversation creation) was lost when the SSE subscriber connected to a different instance than the one generating. The event was only in the generating instance's local earlyEventBuffer and the Redis pub/sub message was already gone by the time the subscriber's channel was active. When subscribing cross-replica (empty buffer, Redis mode, userMessage already in job metadata), reconstruct and emit the created event directly from stored metadata. * test: add skipBufferReplay regression guard for cross-replica created event Add test asserting the resume path (skipBufferReplay: true) does NOT emit a created event on cross-replica subscribe — prevents the duplication fix from PR #12225 from regressing. Add explanatory JSDoc on the cross-replica fallback branch documenting which fields are preserved from trackUserMessage() and why sender/isCreatedByUser are hardcoded. * refactor: replace as-unknown-as casts with discriminated ServerSentEvent union Split ServerSentEvent into StreamEvent | CreatedEvent | FinalEvent so event shapes are statically typed. Removes all as-unknown-as casts in GenerationJobManager and test file; narrows with proper union members where properties are accessed. * fix: await trackUserMessage before PUBLISH for structural ordering trackUserMessage was fire-and-forget — the HSET for userMessage could theoretically race with the PUBLISH. Await it so the write commits before the pub/sub fires, guaranteeing any cross-replica getJob() after the pub/sub window always finds userMessage in Redis. No-op for non-created events (early return before any async work). * refactor: type CreatedEvent.message explicitly, fix JSDoc and import Give CreatedEvent.message its full known shape instead of Record. Update sendEvent JSDoc to reflect the discriminated union. Use barrel import in test file. * refactor: type FinalEvent fields with explicit message and conversation shapes Replace Record on requestMessage, responseMessage, conversation, and runMessages with FinalMessageFields and a typed conversation shape. Captures the known field set used by all final event constructors (abort handler in GenerationJobManager and normal completion in request.js) while allowing extension via index signature for fields contributed by the full TMessage/TConversation schemas. * refactor: narrow trackUserMessage with discriminated union, disambiguate error fields Use 'created' in event to narrow ServerSentEvent to CreatedEvent, eliminating all Record casts and manual field assertions. Add JSDoc to the two distinct error fields on FinalMessageFields and FinalEvent to prevent confusion. * fix: update cross-replica test to expect created event from metadata The cross-replica subscribe fallback now correctly emits a created event reconstructed from persisted metadata when userMessage exists in the Redis job hash. Replica B receives 4 events (created + 3 deltas) instead of 3. --- .../api/src/stream/GenerationJobManager.ts | 50 ++++-- ...ationJobManager.stream_integration.spec.ts | 142 ++++++++++++++++-- packages/api/src/types/events.ts | 49 +++++- packages/api/src/utils/events.ts | 9 +- 4 files changed, 218 insertions(+), 32 deletions(-) diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 1b612dcb8f..3e04ab734b 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -656,7 +656,7 @@ class GenerationJobManagerClass { aborted: true, // Flag for early abort - no messages saved, frontend should go to new chat earlyAbort: isEarlyAbort, - } as unknown as t.ServerSentEvent; + } satisfies t.FinalEvent as t.ServerSentEvent; if (runtime) { runtime.finalEvent = abortFinalEvent; @@ -781,6 +781,27 @@ class GenerationJobManagerClass { } } runtime.earlyEventBuffer = []; + } else if (this._isRedis && !options?.skipBufferReplay && jobData?.userMessage) { + /** + * Cross-replica fallback: the created event was buffered on the generating + * instance and published via Redis pub/sub before this subscriber was active. + * Reconstruct from persisted metadata. Only fields stored by trackUserMessage() + * are available (messageId, parentMessageId, conversationId, text); + * sender/isCreatedByUser are invariant for user messages and added back here. + */ + logger.debug( + `[GenerationJobManager] Cross-replica subscribe: emitting created event from metadata for ${streamId}`, + ); + const createdEvent: t.CreatedEvent = { + created: true, + message: { + ...jobData.userMessage, + sender: 'User', + isCreatedByUser: true, + }, + streamId, + }; + onChunk(createdEvent); } this.eventTransport.syncReorderBuffer?.(streamId); @@ -858,8 +879,7 @@ class GenerationJobManagerClass { return; } - // Track user message from created event - this.trackUserMessage(streamId, event); + await this.trackUserMessage(streamId, event); // For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability) if (this._isRedis) { @@ -943,29 +963,31 @@ class GenerationJobManagerClass { } /** - * Track user message from created event. + * Persist user message metadata from the created event. + * Awaited in emitChunk so the HSET commits before the PUBLISH, + * guaranteeing any cross-replica getJob() after the pub/sub window + * finds userMessage in Redis. */ - private trackUserMessage(streamId: string, event: t.ServerSentEvent): void { - const data = event as Record; - if (!data.created || !data.message) { + private async trackUserMessage(streamId: string, event: t.ServerSentEvent): Promise { + if (!('created' in event)) { return; } - const message = data.message as Record; + const { message } = event; const updates: Partial = { userMessage: { - messageId: message.messageId as string, - parentMessageId: message.parentMessageId as string | undefined, - conversationId: message.conversationId as string | undefined, - text: message.text as string | undefined, + messageId: message.messageId, + parentMessageId: message.parentMessageId, + conversationId: message.conversationId, + text: message.text, }, }; if (message.conversationId) { - updates.conversationId = message.conversationId as string; + updates.conversationId = message.conversationId; } - this.jobStore.updateJob(streamId, updates); + await this.jobStore.updateJob(streamId, updates); } /** diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 2f23510018..3e85ace56d 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -1,6 +1,6 @@ /* eslint jest/no-standalone-expect: ["error", { "additionalTestBlockFunctions": ["testRedis"] }] */ import type { Redis, Cluster } from 'ioredis'; -import type { ServerSentEvent } from '~/types/events'; +import type { ServerSentEvent, StreamEvent, CreatedEvent } from '~/types'; import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport'; import { InMemoryJobStore } from '~/stream/implementations/InMemoryJobStore'; @@ -771,6 +771,127 @@ describe('GenerationJobManager Integration Tests', () => { await GenerationJobManager.destroy(); await jobStore.destroy(); }); + + test('should emit created event from metadata on cross-replica subscribe', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-created-${Date.now()}`; + const userId = 'test-user'; + + await replicaAJobStore.createJob(streamId, userId); + await replicaAJobStore.updateJob(streamId, { + userMessage: { + messageId: 'msg-123', + parentMessageId: '00000000-0000-0000-0000-000000000000', + conversationId: streamId, + text: 'hello world', + }, + }); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(1); + + const created = received[0] as CreatedEvent; + expect(created.created).toBe(true); + expect(created.streamId).toBe(streamId); + expect(created.message.messageId).toBe('msg-123'); + expect(created.message.conversationId).toBe(streamId); + expect(created.message.sender).toBe('User'); + expect(created.message.isCreatedByUser).toBe(true); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should NOT emit created event from metadata when userMessage is not set', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-no-created-${Date.now()}`; + await replicaAJobStore.createJob(streamId, 'test-user'); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(0); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should NOT emit created event when skipBufferReplay is true (resume path)', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-no-replay-${Date.now()}`; + await replicaAJobStore.createJob(streamId, 'test-user'); + await replicaAJobStore.updateJob(streamId, { + userMessage: { + messageId: 'msg-456', + conversationId: streamId, + text: 'hi', + }, + }); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(0); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); }); describeRedis('Sequential Event Ordering (Redis)', () => { @@ -1040,7 +1161,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_message_delta', data: { delta: { content: { type: 'text', text: 'First chunk' } } }, @@ -1077,7 +1198,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_message_delta', data: { delta: { content: { type: 'text', text: 'First' } } }, @@ -1123,7 +1244,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', type: 'message_creation', index: 0 }, @@ -1228,7 +1349,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 20)); expect(resumeEvents.length).toBe(1); - expect(resumeEvents[0].event).toBe('on_message_delta'); + expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1262,7 +1383,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 20)); expect(sub2Events.length).toBe(1); - expect(sub2Events[0].event).toBe('on_message_delta'); + expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1427,7 +1548,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 200)); expect(resumeEvents.length).toBe(1); - expect(resumeEvents[0].event).toBe('on_message_delta'); + expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1458,7 +1579,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 200)); expect(sub2Events.length).toBe(1); - expect(sub2Events[0].event).toBe('on_message_delta'); + expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1997,7 +2118,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); const receivedOnA: unknown[] = []; const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); @@ -2035,7 +2156,8 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 700)); expect(receivedOnA.length).toBe(4); - expect(receivedOnB.length).toBe(3); + expect(receivedOnB.length).toBe(4); + expect((receivedOnB[0] as CreatedEvent).created).toBe(true); subA?.unsubscribe(); subB?.unsubscribe(); diff --git a/packages/api/src/types/events.ts b/packages/api/src/types/events.ts index 1e866fa840..d068888b17 100644 --- a/packages/api/src/types/events.ts +++ b/packages/api/src/types/events.ts @@ -1,4 +1,49 @@ -export type ServerSentEvent = { +/** SSE streaming event (on_run_step, on_message_delta, etc.) */ +export type StreamEvent = { + event: string; data: string | Record; - event?: string; }; + +/** Control event emitted when user message is created and generation starts */ +export type CreatedEvent = { + created: true; + message: { + messageId: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + sender: string; + isCreatedByUser: boolean; + }; + streamId: string; +}; + +export type FinalMessageFields = { + messageId?: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + content?: unknown[]; + sender?: string; + isCreatedByUser?: boolean; + unfinished?: boolean; + /** Per-message error flag — matches TMessage.error (boolean or error text) */ + error?: boolean | string; + [key: string]: unknown; +}; + +/** Terminal event emitted when generation completes or is aborted */ +export type FinalEvent = { + final: true; + requestMessage?: FinalMessageFields | null; + responseMessage?: FinalMessageFields | null; + conversation?: { conversationId?: string; [key: string]: unknown } | null; + title?: string; + aborted?: boolean; + earlyAbort?: boolean; + runMessages?: FinalMessageFields[]; + /** Top-level event error (abort-during-completion edge case) */ + error?: { message: string }; +}; + +export type ServerSentEvent = StreamEvent | CreatedEvent | FinalEvent; diff --git a/packages/api/src/utils/events.ts b/packages/api/src/utils/events.ts index 20c9583993..e084e631f5 100644 --- a/packages/api/src/utils/events.ts +++ b/packages/api/src/utils/events.ts @@ -2,14 +2,11 @@ import type { Response as ServerResponse } from 'express'; import type { ServerSentEvent } from '~/types'; /** - * Sends message data in Server Sent Events format. - * @param res - The server response. - * @param event - The message event. - * @param event.event - The type of event. - * @param event.data - The message to be sent. + * Sends a Server-Sent Event to the client. + * Empty-string StreamEvent data is silently dropped. */ export function sendEvent(res: ServerResponse, event: ServerSentEvent): void { - if (typeof event.data === 'string' && event.data.length === 0) { + if ('data' in event && typeof event.data === 'string' && event.data.length === 0) { return; } res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);