diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 1b612dcb8f..3e04ab734b 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -656,7 +656,7 @@ class GenerationJobManagerClass { aborted: true, // Flag for early abort - no messages saved, frontend should go to new chat earlyAbort: isEarlyAbort, - } as unknown as t.ServerSentEvent; + } satisfies t.FinalEvent as t.ServerSentEvent; if (runtime) { runtime.finalEvent = abortFinalEvent; @@ -781,6 +781,27 @@ class GenerationJobManagerClass { } } runtime.earlyEventBuffer = []; + } else if (this._isRedis && !options?.skipBufferReplay && jobData?.userMessage) { + /** + * Cross-replica fallback: the created event was buffered on the generating + * instance and published via Redis pub/sub before this subscriber was active. + * Reconstruct from persisted metadata. Only fields stored by trackUserMessage() + * are available (messageId, parentMessageId, conversationId, text); + * sender/isCreatedByUser are invariant for user messages and added back here. + */ + logger.debug( + `[GenerationJobManager] Cross-replica subscribe: emitting created event from metadata for ${streamId}`, + ); + const createdEvent: t.CreatedEvent = { + created: true, + message: { + ...jobData.userMessage, + sender: 'User', + isCreatedByUser: true, + }, + streamId, + }; + onChunk(createdEvent); } this.eventTransport.syncReorderBuffer?.(streamId); @@ -858,8 +879,7 @@ class GenerationJobManagerClass { return; } - // Track user message from created event - this.trackUserMessage(streamId, event); + await this.trackUserMessage(streamId, event); // For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability) if (this._isRedis) { @@ -943,29 +963,31 @@ class GenerationJobManagerClass { } /** - * Track user message from created event. + * Persist user message metadata from the created event. + * Awaited in emitChunk so the HSET commits before the PUBLISH, + * guaranteeing any cross-replica getJob() after the pub/sub window + * finds userMessage in Redis. */ - private trackUserMessage(streamId: string, event: t.ServerSentEvent): void { - const data = event as Record; - if (!data.created || !data.message) { + private async trackUserMessage(streamId: string, event: t.ServerSentEvent): Promise { + if (!('created' in event)) { return; } - const message = data.message as Record; + const { message } = event; const updates: Partial = { userMessage: { - messageId: message.messageId as string, - parentMessageId: message.parentMessageId as string | undefined, - conversationId: message.conversationId as string | undefined, - text: message.text as string | undefined, + messageId: message.messageId, + parentMessageId: message.parentMessageId, + conversationId: message.conversationId, + text: message.text, }, }; if (message.conversationId) { - updates.conversationId = message.conversationId as string; + updates.conversationId = message.conversationId; } - this.jobStore.updateJob(streamId, updates); + await this.jobStore.updateJob(streamId, updates); } /** diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 2f23510018..3e85ace56d 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -1,6 +1,6 @@ /* eslint jest/no-standalone-expect: ["error", { "additionalTestBlockFunctions": ["testRedis"] }] */ import type { Redis, Cluster } from 'ioredis'; -import type { ServerSentEvent } from '~/types/events'; +import type { ServerSentEvent, StreamEvent, CreatedEvent } from '~/types'; import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport'; import { InMemoryJobStore } from '~/stream/implementations/InMemoryJobStore'; @@ -771,6 +771,127 @@ describe('GenerationJobManager Integration Tests', () => { await GenerationJobManager.destroy(); await jobStore.destroy(); }); + + test('should emit created event from metadata on cross-replica subscribe', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-created-${Date.now()}`; + const userId = 'test-user'; + + await replicaAJobStore.createJob(streamId, userId); + await replicaAJobStore.updateJob(streamId, { + userMessage: { + messageId: 'msg-123', + parentMessageId: '00000000-0000-0000-0000-000000000000', + conversationId: streamId, + text: 'hello world', + }, + }); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(1); + + const created = received[0] as CreatedEvent; + expect(created.created).toBe(true); + expect(created.streamId).toBe(streamId); + expect(created.message.messageId).toBe('msg-123'); + expect(created.message.conversationId).toBe(streamId); + expect(created.message.sender).toBe('User'); + expect(created.message.isCreatedByUser).toBe(true); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should NOT emit created event from metadata when userMessage is not set', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-no-created-${Date.now()}`; + await replicaAJobStore.createJob(streamId, 'test-user'); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(0); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should NOT emit created event when skipBufferReplay is true (resume path)', async () => { + const replicaAJobStore = new RedisJobStore(ioredisClient!); + await replicaAJobStore.initialize(); + + const streamId = `cross-no-replay-${Date.now()}`; + await replicaAJobStore.createJob(streamId, 'test-user'); + await replicaAJobStore.updateJob(streamId, { + userMessage: { + messageId: 'msg-456', + conversationId: streamId, + text: 'hi', + }, + }); + + jest.resetModules(); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + GenerationJobManager.initialize(); + + const received: unknown[] = []; + const subscription = await GenerationJobManager.subscribe( + streamId, + (event) => received.push(event), + undefined, + undefined, + { skipBufferReplay: true }, + ); + + expect(subscription).not.toBeNull(); + expect(received.length).toBe(0); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); }); describeRedis('Sequential Event Ordering (Redis)', () => { @@ -1040,7 +1161,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_message_delta', data: { delta: { content: { type: 'text', text: 'First chunk' } } }, @@ -1077,7 +1198,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_message_delta', data: { delta: { content: { type: 'text', text: 'First' } } }, @@ -1123,7 +1244,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); await manager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', type: 'message_creation', index: 0 }, @@ -1228,7 +1349,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 20)); expect(resumeEvents.length).toBe(1); - expect(resumeEvents[0].event).toBe('on_message_delta'); + expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1262,7 +1383,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 20)); expect(sub2Events.length).toBe(1); - expect(sub2Events[0].event).toBe('on_message_delta'); + expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1427,7 +1548,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 200)); expect(resumeEvents.length).toBe(1); - expect(resumeEvents[0].event).toBe('on_message_delta'); + expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1458,7 +1579,7 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 200)); expect(sub2Events.length).toBe(1); - expect(sub2Events[0].event).toBe('on_message_delta'); + expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta'); sub2?.unsubscribe(); await manager.destroy(); @@ -1997,7 +2118,7 @@ describe('GenerationJobManager Integration Tests', () => { created: true, message: { text: 'hello' }, streamId, - } as unknown as ServerSentEvent); + } as CreatedEvent); const receivedOnA: unknown[] = []; const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event)); @@ -2035,7 +2156,8 @@ describe('GenerationJobManager Integration Tests', () => { await new Promise((resolve) => setTimeout(resolve, 700)); expect(receivedOnA.length).toBe(4); - expect(receivedOnB.length).toBe(3); + expect(receivedOnB.length).toBe(4); + expect((receivedOnB[0] as CreatedEvent).created).toBe(true); subA?.unsubscribe(); subB?.unsubscribe(); diff --git a/packages/api/src/types/events.ts b/packages/api/src/types/events.ts index 1e866fa840..d068888b17 100644 --- a/packages/api/src/types/events.ts +++ b/packages/api/src/types/events.ts @@ -1,4 +1,49 @@ -export type ServerSentEvent = { +/** SSE streaming event (on_run_step, on_message_delta, etc.) */ +export type StreamEvent = { + event: string; data: string | Record; - event?: string; }; + +/** Control event emitted when user message is created and generation starts */ +export type CreatedEvent = { + created: true; + message: { + messageId: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + sender: string; + isCreatedByUser: boolean; + }; + streamId: string; +}; + +export type FinalMessageFields = { + messageId?: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + content?: unknown[]; + sender?: string; + isCreatedByUser?: boolean; + unfinished?: boolean; + /** Per-message error flag — matches TMessage.error (boolean or error text) */ + error?: boolean | string; + [key: string]: unknown; +}; + +/** Terminal event emitted when generation completes or is aborted */ +export type FinalEvent = { + final: true; + requestMessage?: FinalMessageFields | null; + responseMessage?: FinalMessageFields | null; + conversation?: { conversationId?: string; [key: string]: unknown } | null; + title?: string; + aborted?: boolean; + earlyAbort?: boolean; + runMessages?: FinalMessageFields[]; + /** Top-level event error (abort-during-completion edge case) */ + error?: { message: string }; +}; + +export type ServerSentEvent = StreamEvent | CreatedEvent | FinalEvent; diff --git a/packages/api/src/utils/events.ts b/packages/api/src/utils/events.ts index 20c9583993..e084e631f5 100644 --- a/packages/api/src/utils/events.ts +++ b/packages/api/src/utils/events.ts @@ -2,14 +2,11 @@ import type { Response as ServerResponse } from 'express'; import type { ServerSentEvent } from '~/types'; /** - * Sends message data in Server Sent Events format. - * @param res - The server response. - * @param event - The message event. - * @param event.event - The type of event. - * @param event.data - The message to be sent. + * Sends a Server-Sent Event to the client. + * Empty-string StreamEvent data is silently dropped. */ export function sendEvent(res: ServerResponse, event: ServerSentEvent): void { - if (typeof event.data === 'string' && event.data.length === 0) { + if ('data' in event && typeof event.data === 'string' && event.data.length === 0) { return; } res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);