diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index f592662b39..71c13a941c 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -10,6 +10,17 @@ import { InMemoryJobStore } from './implementations/InMemoryJobStore'; /** * Runtime state for active jobs - not serializable, kept in-memory per instance. * Contains AbortController, ready promise, and other non-serializable state. + * + * @property abortController - Controller to abort the generation + * @property readyPromise - Resolves when first real subscriber connects (used to sync generation start) + * @property resolveReady - Function to resolve readyPromise + * @property finalEvent - Cached final event for late subscribers + * @property syncSent - Whether sync event was sent (reset when all subscribers leave) + * @property allSubscribersLeftHandlers - Internal handlers for disconnect events. + * These are stored separately from eventTransport subscribers to avoid being counted + * in subscriber count. This is critical: if these were registered via subscribe(), + * they would count as subscribers, causing isFirstSubscriber() to return false + * when the real client connects, which would prevent readyPromise from resolving. */ interface RuntimeJobState { abortController: AbortController; @@ -17,6 +28,7 @@ interface RuntimeJobState { resolveReady: () => void; finalEvent?: t.ServerSentEvent; syncSent: boolean; + allSubscribersLeftHandlers?: Array<(...args: unknown[]) => void>; } /** @@ -65,13 +77,30 @@ class GenerationJobManagerClass { /** * Create a new generation job. + * + * This sets up: + * 1. Serializable job data in the job store + * 2. Runtime state including readyPromise (resolves when first SSE client connects) + * 3. allSubscribersLeft callback for handling client disconnections + * + * The readyPromise mechanism ensures generation doesn't start before the client + * is ready to receive events. The controller awaits this promise (with a short timeout) + * before starting LLM generation. + * + * @param streamId - Unique identifier for this stream + * @param userId - User who initiated the request + * @param conversationId - Optional conversation ID for lookup * @returns A facade object compatible with the old GenerationJob interface */ createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob { // Create serializable job data (sync for in-memory) const jobData = this.jobStore.createJobSync(streamId, userId, conversationId); - // Create runtime state + /** + * Create runtime state with readyPromise. + * readyPromise is resolved in subscribe() when isFirstSubscriber() returns true. + * This synchronizes generation start with client connection. + */ let resolveReady: () => void; const readyPromise = new Promise((resolve) => { resolveReady = resolve; @@ -85,17 +114,28 @@ class GenerationJobManagerClass { }; this.runtimeState.set(streamId, runtime); - // Set up all-subscribers-left callback + /** + * Set up all-subscribers-left callback. + * When all SSE clients disconnect, this: + * 1. Resets syncSent so reconnecting clients get sync event + * 2. Calls any registered allSubscribersLeft handlers (e.g., to save partial responses) + */ this.eventTransport.onAllSubscribersLeft(streamId, () => { const currentRuntime = this.runtimeState.get(streamId); if (currentRuntime) { currentRuntime.syncSent = false; + // Call registered handlers (from job.emitter.on('allSubscribersLeft', ...)) + const content = this.contentState.getContentParts(streamId) ?? []; + if (currentRuntime.allSubscribersLeftHandlers) { + for (const handler of currentRuntime.allSubscribersLeftHandlers) { + try { + handler(content); + } catch (err) { + logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err); + } + } + } } - const content = this.contentState.getContentParts(streamId) ?? []; - this.eventTransport.emitChunk(streamId, { - _internal: 'allSubscribersLeft', - content, - }); logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`); }); @@ -107,26 +147,43 @@ class GenerationJobManagerClass { /** * Build a GenerationJob facade from job data and runtime state. - * This maintains backwards compatibility with existing code. + * This maintains backwards compatibility with existing code that expects + * job.emitter, job.abortController, etc. + * + * IMPORTANT: The emitterProxy.on('allSubscribersLeft') handler registration + * does NOT use eventTransport.subscribe(). This is intentional: + * + * If we used subscribe() for internal handlers, those handlers would count + * as subscribers. When the real SSE client connects, isFirstSubscriber() + * would return false (because internal handler was "first"), and readyPromise + * would never resolve - causing a 5-second timeout delay before generation starts. + * + * Instead, allSubscribersLeft handlers are stored in runtime.allSubscribersLeftHandlers + * and called directly from the onAllSubscribersLeft callback in createJob(). + * + * @param streamId - The stream identifier + * @param jobData - Serializable job metadata from job store + * @param runtime - Non-serializable runtime state (abort controller, promises, etc.) + * @returns A GenerationJob facade object */ private buildJobFacade( streamId: string, jobData: SerializableJobData, runtime: RuntimeJobState, ): t.GenerationJob { - // Create a proxy emitter that delegates to eventTransport + /** + * Proxy emitter that delegates to eventTransport for most operations. + * Exception: allSubscribersLeft handlers are stored separately to avoid + * incrementing subscriber count (see class JSDoc above). + */ const emitterProxy = { on: (event: string, handler: (...args: unknown[]) => void) => { if (event === 'allSubscribersLeft') { - // Subscribe to internal event - this.eventTransport.subscribe(streamId, { - onChunk: (e) => { - const evt = e as Record; - if (evt._internal === 'allSubscribersLeft') { - handler(evt.content); - } - }, - }); + // Store handler for internal callback - don't use subscribe() to avoid counting as a subscriber + if (!runtime.allSubscribersLeftHandlers) { + runtime.allSubscribersLeftHandlers = []; + } + runtime.allSubscribersLeftHandlers.push(handler); } }, emit: () => { @@ -285,6 +342,20 @@ class GenerationJobManagerClass { /** * Subscribe to a job's event stream. + * + * This is called when an SSE client connects to /chat/stream/:streamId. + * On first subscription, it resolves readyPromise to signal that generation can start. + * + * The subscriber count is critical for the readyPromise mechanism: + * - isFirstSubscriber() returns true when subscriber count is exactly 1 + * - This happens when the first REAL client connects (not internal handlers) + * - Internal allSubscribersLeft handlers are stored separately to avoid being counted + * + * @param streamId - The stream to subscribe to + * @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.) + * @param onDone - Handler for completion event (includes final message) + * @param onError - Handler for error events + * @returns Subscription object with unsubscribe function, or null if job not found */ subscribe( streamId: string, @@ -323,9 +394,15 @@ class GenerationJobManagerClass { }); // Signal ready on first subscriber - if (this.eventTransport.isFirstSubscriber(streamId)) { + const isFirst = this.eventTransport.isFirstSubscriber(streamId); + logger.debug( + `[GenerationJobManager] subscribe check: streamId=${streamId}, isFirst=${isFirst}`, + ); + if (isFirst) { runtime.resolveReady(); - logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`); + logger.debug( + `[GenerationJobManager] First subscriber ready, resolving promise for ${streamId}`, + ); } return subscription; diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts index 3a781fa4ba..e4ac88b19e 100644 --- a/packages/api/src/stream/implementations/InMemoryEventTransport.ts +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -43,6 +43,10 @@ export class InMemoryEventTransport implements IEventTransport { state.emitter.on('done', doneHandler); state.emitter.on('error', errorHandler); + logger.debug( + `[InMemoryEventTransport] subscribe ${streamId}: listeners=${state.emitter.listenerCount('chunk')}`, + ); + return { unsubscribe: () => { const currentState = this.streams.get(streamId); @@ -90,7 +94,9 @@ export class InMemoryEventTransport implements IEventTransport { */ isFirstSubscriber(streamId: string): boolean { const state = this.streams.get(streamId); - return state?.emitter.listenerCount('chunk') === 1; + const count = state?.emitter.listenerCount('chunk') ?? 0; + logger.debug(`[InMemoryEventTransport] isFirstSubscriber ${streamId}: count=${count}`); + return count === 1; } /** diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 6aa30659a8..2b2a8800a5 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -112,6 +112,9 @@ export interface IEventTransport { /** Get subscriber count for a stream */ getSubscriberCount(streamId: string): number; + /** Check if this is the first subscriber (for ready signaling) */ + isFirstSubscriber(streamId: string): boolean; + /** Listen for all subscribers leaving */ onAllSubscribersLeft(streamId: string, callback: () => void): void; }