🛰️ fix: Cross-Replica Created Event Delivery (#12231)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run

* fix: emit created event from metadata on cross-replica subscribe

In multi-instance Redis deployments, the created event (which triggers
sidebar conversation creation) was lost when the SSE subscriber connected
to a different instance than the one generating. The event was only in
the generating instance's local earlyEventBuffer and the Redis pub/sub
message was already gone by the time the subscriber's channel was active.

When subscribing cross-replica (empty buffer, Redis mode, userMessage
already in job metadata), reconstruct and emit the created event
directly from stored metadata.

* test: add skipBufferReplay regression guard for cross-replica created event

Add test asserting the resume path (skipBufferReplay: true) does NOT
emit a created event on cross-replica subscribe — prevents the
duplication fix from PR #12225 from regressing. Add explanatory JSDoc
on the cross-replica fallback branch documenting which fields are
preserved from trackUserMessage() and why sender/isCreatedByUser
are hardcoded.

* refactor: replace as-unknown-as casts with discriminated ServerSentEvent union

Split ServerSentEvent into StreamEvent | CreatedEvent | FinalEvent so
event shapes are statically typed. Removes all as-unknown-as casts in
GenerationJobManager and test file; narrows with proper union members
where properties are accessed.

* fix: await trackUserMessage before PUBLISH for structural ordering

trackUserMessage was fire-and-forget — the HSET for userMessage could
theoretically race with the PUBLISH. Await it so the write commits
before the pub/sub fires, guaranteeing any cross-replica getJob() after
the pub/sub window always finds userMessage in Redis. No-op for
non-created events (early return before any async work).

* refactor: type CreatedEvent.message explicitly, fix JSDoc and import

Give CreatedEvent.message its full known shape instead of
Record<string, unknown>. Update sendEvent JSDoc to reflect the
discriminated union. Use barrel import in test file.

* refactor: type FinalEvent fields with explicit message and conversation shapes

Replace Record<string, unknown> on requestMessage, responseMessage,
conversation, and runMessages with FinalMessageFields and a typed
conversation shape. Captures the known field set used by all final
event constructors (abort handler in GenerationJobManager and normal
completion in request.js) while allowing extension via index signature
for fields contributed by the full TMessage/TConversation schemas.

* refactor: narrow trackUserMessage with discriminated union, disambiguate error fields

Use 'created' in event to narrow ServerSentEvent to CreatedEvent,
eliminating all Record<string, unknown> casts and manual field
assertions. Add JSDoc to the two distinct error fields on
FinalMessageFields and FinalEvent to prevent confusion.

* fix: update cross-replica test to expect created event from metadata

The cross-replica subscribe fallback now correctly emits a created
event reconstructed from persisted metadata when userMessage exists
in the Redis job hash. Replica B receives 4 events (created + 3
deltas) instead of 3.
This commit is contained in:
Danny Avila 2026-03-15 11:11:10 -04:00 committed by GitHub
parent e079fc4900
commit a01959b3d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 218 additions and 32 deletions

View file

@ -656,7 +656,7 @@ class GenerationJobManagerClass {
aborted: true,
// Flag for early abort - no messages saved, frontend should go to new chat
earlyAbort: isEarlyAbort,
} as unknown as t.ServerSentEvent;
} satisfies t.FinalEvent as t.ServerSentEvent;
if (runtime) {
runtime.finalEvent = abortFinalEvent;
@ -781,6 +781,27 @@ class GenerationJobManagerClass {
}
}
runtime.earlyEventBuffer = [];
} else if (this._isRedis && !options?.skipBufferReplay && jobData?.userMessage) {
/**
* Cross-replica fallback: the created event was buffered on the generating
* instance and published via Redis pub/sub before this subscriber was active.
* Reconstruct from persisted metadata. Only fields stored by trackUserMessage()
* are available (messageId, parentMessageId, conversationId, text);
* sender/isCreatedByUser are invariant for user messages and added back here.
*/
logger.debug(
`[GenerationJobManager] Cross-replica subscribe: emitting created event from metadata for ${streamId}`,
);
const createdEvent: t.CreatedEvent = {
created: true,
message: {
...jobData.userMessage,
sender: 'User',
isCreatedByUser: true,
},
streamId,
};
onChunk(createdEvent);
}
this.eventTransport.syncReorderBuffer?.(streamId);
@ -858,8 +879,7 @@ class GenerationJobManagerClass {
return;
}
// Track user message from created event
this.trackUserMessage(streamId, event);
await this.trackUserMessage(streamId, event);
// For Redis mode, persist chunk for later reconstruction (fire-and-forget for resumability)
if (this._isRedis) {
@ -943,29 +963,31 @@ class GenerationJobManagerClass {
}
/**
* Track user message from created event.
* Persist user message metadata from the created event.
* Awaited in emitChunk so the HSET commits before the PUBLISH,
* guaranteeing any cross-replica getJob() after the pub/sub window
* finds userMessage in Redis.
*/
private trackUserMessage(streamId: string, event: t.ServerSentEvent): void {
const data = event as Record<string, unknown>;
if (!data.created || !data.message) {
private async trackUserMessage(streamId: string, event: t.ServerSentEvent): Promise<void> {
if (!('created' in event)) {
return;
}
const message = data.message as Record<string, unknown>;
const { message } = event;
const updates: Partial<SerializableJobData> = {
userMessage: {
messageId: message.messageId as string,
parentMessageId: message.parentMessageId as string | undefined,
conversationId: message.conversationId as string | undefined,
text: message.text as string | undefined,
messageId: message.messageId,
parentMessageId: message.parentMessageId,
conversationId: message.conversationId,
text: message.text,
},
};
if (message.conversationId) {
updates.conversationId = message.conversationId as string;
updates.conversationId = message.conversationId;
}
this.jobStore.updateJob(streamId, updates);
await this.jobStore.updateJob(streamId, updates);
}
/**

View file

@ -1,6 +1,6 @@
/* eslint jest/no-standalone-expect: ["error", { "additionalTestBlockFunctions": ["testRedis"] }] */
import type { Redis, Cluster } from 'ioredis';
import type { ServerSentEvent } from '~/types/events';
import type { ServerSentEvent, StreamEvent, CreatedEvent } from '~/types';
import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport';
import { RedisEventTransport } from '~/stream/implementations/RedisEventTransport';
import { InMemoryJobStore } from '~/stream/implementations/InMemoryJobStore';
@ -771,6 +771,127 @@ describe('GenerationJobManager Integration Tests', () => {
await GenerationJobManager.destroy();
await jobStore.destroy();
});
test('should emit created event from metadata on cross-replica subscribe', async () => {
const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize();
const streamId = `cross-created-${Date.now()}`;
const userId = 'test-user';
await replicaAJobStore.createJob(streamId, userId);
await replicaAJobStore.updateJob(streamId, {
userMessage: {
messageId: 'msg-123',
parentMessageId: '00000000-0000-0000-0000-000000000000',
conversationId: streamId,
text: 'hello world',
},
});
jest.resetModules();
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
GenerationJobManager.initialize();
const received: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(
streamId,
(event) => received.push(event),
);
expect(subscription).not.toBeNull();
expect(received.length).toBe(1);
const created = received[0] as CreatedEvent;
expect(created.created).toBe(true);
expect(created.streamId).toBe(streamId);
expect(created.message.messageId).toBe('msg-123');
expect(created.message.conversationId).toBe(streamId);
expect(created.message.sender).toBe('User');
expect(created.message.isCreatedByUser).toBe(true);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
test('should NOT emit created event from metadata when userMessage is not set', async () => {
const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize();
const streamId = `cross-no-created-${Date.now()}`;
await replicaAJobStore.createJob(streamId, 'test-user');
jest.resetModules();
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
GenerationJobManager.initialize();
const received: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(
streamId,
(event) => received.push(event),
);
expect(subscription).not.toBeNull();
expect(received.length).toBe(0);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
test('should NOT emit created event when skipBufferReplay is true (resume path)', async () => {
const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize();
const streamId = `cross-no-replay-${Date.now()}`;
await replicaAJobStore.createJob(streamId, 'test-user');
await replicaAJobStore.updateJob(streamId, {
userMessage: {
messageId: 'msg-456',
conversationId: streamId,
text: 'hi',
},
});
jest.resetModules();
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
GenerationJobManager.initialize();
const received: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(
streamId,
(event) => received.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
expect(subscription).not.toBeNull();
expect(received.length).toBe(0);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
});
describeRedis('Sequential Event Ordering (Redis)', () => {
@ -1040,7 +1161,7 @@ describe('GenerationJobManager Integration Tests', () => {
created: true,
message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
} as CreatedEvent);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'First chunk' } } },
@ -1077,7 +1198,7 @@ describe('GenerationJobManager Integration Tests', () => {
created: true,
message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
} as CreatedEvent);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'First' } } },
@ -1123,7 +1244,7 @@ describe('GenerationJobManager Integration Tests', () => {
created: true,
message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
} as CreatedEvent);
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', type: 'message_creation', index: 0 },
@ -1228,7 +1349,7 @@ describe('GenerationJobManager Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 20));
expect(resumeEvents.length).toBe(1);
expect(resumeEvents[0].event).toBe('on_message_delta');
expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
@ -1262,7 +1383,7 @@ describe('GenerationJobManager Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 20));
expect(sub2Events.length).toBe(1);
expect(sub2Events[0].event).toBe('on_message_delta');
expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
@ -1427,7 +1548,7 @@ describe('GenerationJobManager Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 200));
expect(resumeEvents.length).toBe(1);
expect(resumeEvents[0].event).toBe('on_message_delta');
expect((resumeEvents[0] as StreamEvent).event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
@ -1458,7 +1579,7 @@ describe('GenerationJobManager Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 200));
expect(sub2Events.length).toBe(1);
expect(sub2Events[0].event).toBe('on_message_delta');
expect((sub2Events[0] as StreamEvent).event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
@ -1997,7 +2118,7 @@ describe('GenerationJobManager Integration Tests', () => {
created: true,
message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
} as CreatedEvent);
const receivedOnA: unknown[] = [];
const subA = await replicaA.subscribe(streamId, (event: unknown) => receivedOnA.push(event));
@ -2035,7 +2156,8 @@ describe('GenerationJobManager Integration Tests', () => {
await new Promise((resolve) => setTimeout(resolve, 700));
expect(receivedOnA.length).toBe(4);
expect(receivedOnB.length).toBe(3);
expect(receivedOnB.length).toBe(4);
expect((receivedOnB[0] as CreatedEvent).created).toBe(true);
subA?.unsubscribe();
subB?.unsubscribe();

View file

@ -1,4 +1,49 @@
export type ServerSentEvent = {
/** SSE streaming event (on_run_step, on_message_delta, etc.) */
export type StreamEvent = {
event: string;
data: string | Record<string, unknown>;
event?: string;
};
/** Control event emitted when user message is created and generation starts */
export type CreatedEvent = {
created: true;
message: {
messageId: string;
parentMessageId?: string;
conversationId?: string;
text?: string;
sender: string;
isCreatedByUser: boolean;
};
streamId: string;
};
export type FinalMessageFields = {
messageId?: string;
parentMessageId?: string;
conversationId?: string;
text?: string;
content?: unknown[];
sender?: string;
isCreatedByUser?: boolean;
unfinished?: boolean;
/** Per-message error flag — matches TMessage.error (boolean or error text) */
error?: boolean | string;
[key: string]: unknown;
};
/** Terminal event emitted when generation completes or is aborted */
export type FinalEvent = {
final: true;
requestMessage?: FinalMessageFields | null;
responseMessage?: FinalMessageFields | null;
conversation?: { conversationId?: string; [key: string]: unknown } | null;
title?: string;
aborted?: boolean;
earlyAbort?: boolean;
runMessages?: FinalMessageFields[];
/** Top-level event error (abort-during-completion edge case) */
error?: { message: string };
};
export type ServerSentEvent = StreamEvent | CreatedEvent | FinalEvent;

View file

@ -2,14 +2,11 @@ import type { Response as ServerResponse } from 'express';
import type { ServerSentEvent } from '~/types';
/**
* Sends message data in Server Sent Events format.
* @param res - The server response.
* @param event - The message event.
* @param event.event - The type of event.
* @param event.data - The message to be sent.
* Sends a Server-Sent Event to the client.
* Empty-string StreamEvent data is silently dropped.
*/
export function sendEvent(res: ServerResponse, event: ServerSentEvent): void {
if (typeof event.data === 'string' && event.data.length === 0) {
if ('data' in event && typeof event.data === 'string' && event.data.length === 0) {
return;
}
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);