refactor: Enhance GenerationJobManager with improved subscriber handling

- Updated RuntimeJobState to include allSubscribersLeftHandlers for managing client disconnections without affecting subscriber count.
- Refined createJob and subscribe methods to ensure generation starts only when the first real client connects.
- Added detailed documentation for methods and properties to clarify the synchronization of job generation with client readiness.
- Improved logging for subscriber checks and event handling to facilitate debugging and monitoring.
This commit is contained in:
Danny Avila 2025-12-12 02:36:44 -05:00
parent ff86f96416
commit 9fb7594ebe
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
3 changed files with 107 additions and 21 deletions

View file

@ -10,6 +10,17 @@ import { InMemoryJobStore } from './implementations/InMemoryJobStore';
/**
* Runtime state for active jobs - not serializable, kept in-memory per instance.
* Contains AbortController, ready promise, and other non-serializable state.
*
* @property abortController - Controller to abort the generation
* @property readyPromise - Resolves when first real subscriber connects (used to sync generation start)
* @property resolveReady - Function to resolve readyPromise
* @property finalEvent - Cached final event for late subscribers
* @property syncSent - Whether sync event was sent (reset when all subscribers leave)
* @property allSubscribersLeftHandlers - Internal handlers for disconnect events.
* These are stored separately from eventTransport subscribers to avoid being counted
* in subscriber count. This is critical: if these were registered via subscribe(),
* they would count as subscribers, causing isFirstSubscriber() to return false
* when the real client connects, which would prevent readyPromise from resolving.
*/
interface RuntimeJobState {
abortController: AbortController;
@ -17,6 +28,7 @@ interface RuntimeJobState {
resolveReady: () => void;
finalEvent?: t.ServerSentEvent;
syncSent: boolean;
allSubscribersLeftHandlers?: Array<(...args: unknown[]) => void>;
}
/**
@ -65,13 +77,30 @@ class GenerationJobManagerClass {
/**
* Create a new generation job.
*
* This sets up:
* 1. Serializable job data in the job store
* 2. Runtime state including readyPromise (resolves when first SSE client connects)
* 3. allSubscribersLeft callback for handling client disconnections
*
* The readyPromise mechanism ensures generation doesn't start before the client
* is ready to receive events. The controller awaits this promise (with a short timeout)
* before starting LLM generation.
*
* @param streamId - Unique identifier for this stream
* @param userId - User who initiated the request
* @param conversationId - Optional conversation ID for lookup
* @returns A facade object compatible with the old GenerationJob interface
*/
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
// Create serializable job data (sync for in-memory)
const jobData = this.jobStore.createJobSync(streamId, userId, conversationId);
// Create runtime state
/**
* Create runtime state with readyPromise.
* readyPromise is resolved in subscribe() when isFirstSubscriber() returns true.
* This synchronizes generation start with client connection.
*/
let resolveReady: () => void;
const readyPromise = new Promise<void>((resolve) => {
resolveReady = resolve;
@ -85,17 +114,28 @@ class GenerationJobManagerClass {
};
this.runtimeState.set(streamId, runtime);
// Set up all-subscribers-left callback
/**
* Set up all-subscribers-left callback.
* When all SSE clients disconnect, this:
* 1. Resets syncSent so reconnecting clients get sync event
* 2. Calls any registered allSubscribersLeft handlers (e.g., to save partial responses)
*/
this.eventTransport.onAllSubscribersLeft(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime) {
currentRuntime.syncSent = false;
// Call registered handlers (from job.emitter.on('allSubscribersLeft', ...))
const content = this.contentState.getContentParts(streamId) ?? [];
if (currentRuntime.allSubscribersLeftHandlers) {
for (const handler of currentRuntime.allSubscribersLeftHandlers) {
try {
handler(content);
} catch (err) {
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
}
}
}
}
const content = this.contentState.getContentParts(streamId) ?? [];
this.eventTransport.emitChunk(streamId, {
_internal: 'allSubscribersLeft',
content,
});
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
});
@ -107,26 +147,43 @@ class GenerationJobManagerClass {
/**
* Build a GenerationJob facade from job data and runtime state.
* This maintains backwards compatibility with existing code.
* This maintains backwards compatibility with existing code that expects
* job.emitter, job.abortController, etc.
*
* IMPORTANT: The emitterProxy.on('allSubscribersLeft') handler registration
* does NOT use eventTransport.subscribe(). This is intentional:
*
* If we used subscribe() for internal handlers, those handlers would count
* as subscribers. When the real SSE client connects, isFirstSubscriber()
* would return false (because internal handler was "first"), and readyPromise
* would never resolve - causing a 5-second timeout delay before generation starts.
*
* Instead, allSubscribersLeft handlers are stored in runtime.allSubscribersLeftHandlers
* and called directly from the onAllSubscribersLeft callback in createJob().
*
* @param streamId - The stream identifier
* @param jobData - Serializable job metadata from job store
* @param runtime - Non-serializable runtime state (abort controller, promises, etc.)
* @returns A GenerationJob facade object
*/
private buildJobFacade(
streamId: string,
jobData: SerializableJobData,
runtime: RuntimeJobState,
): t.GenerationJob {
// Create a proxy emitter that delegates to eventTransport
/**
* Proxy emitter that delegates to eventTransport for most operations.
* Exception: allSubscribersLeft handlers are stored separately to avoid
* incrementing subscriber count (see class JSDoc above).
*/
const emitterProxy = {
on: (event: string, handler: (...args: unknown[]) => void) => {
if (event === 'allSubscribersLeft') {
// Subscribe to internal event
this.eventTransport.subscribe(streamId, {
onChunk: (e) => {
const evt = e as Record<string, unknown>;
if (evt._internal === 'allSubscribersLeft') {
handler(evt.content);
}
},
});
// Store handler for internal callback - don't use subscribe() to avoid counting as a subscriber
if (!runtime.allSubscribersLeftHandlers) {
runtime.allSubscribersLeftHandlers = [];
}
runtime.allSubscribersLeftHandlers.push(handler);
}
},
emit: () => {
@ -285,6 +342,20 @@ class GenerationJobManagerClass {
/**
* Subscribe to a job's event stream.
*
* This is called when an SSE client connects to /chat/stream/:streamId.
* On first subscription, it resolves readyPromise to signal that generation can start.
*
* The subscriber count is critical for the readyPromise mechanism:
* - isFirstSubscriber() returns true when subscriber count is exactly 1
* - This happens when the first REAL client connects (not internal handlers)
* - Internal allSubscribersLeft handlers are stored separately to avoid being counted
*
* @param streamId - The stream to subscribe to
* @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.)
* @param onDone - Handler for completion event (includes final message)
* @param onError - Handler for error events
* @returns Subscription object with unsubscribe function, or null if job not found
*/
subscribe(
streamId: string,
@ -323,9 +394,15 @@ class GenerationJobManagerClass {
});
// Signal ready on first subscriber
if (this.eventTransport.isFirstSubscriber(streamId)) {
const isFirst = this.eventTransport.isFirstSubscriber(streamId);
logger.debug(
`[GenerationJobManager] subscribe check: streamId=${streamId}, isFirst=${isFirst}`,
);
if (isFirst) {
runtime.resolveReady();
logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`);
logger.debug(
`[GenerationJobManager] First subscriber ready, resolving promise for ${streamId}`,
);
}
return subscription;

View file

@ -43,6 +43,10 @@ export class InMemoryEventTransport implements IEventTransport {
state.emitter.on('done', doneHandler);
state.emitter.on('error', errorHandler);
logger.debug(
`[InMemoryEventTransport] subscribe ${streamId}: listeners=${state.emitter.listenerCount('chunk')}`,
);
return {
unsubscribe: () => {
const currentState = this.streams.get(streamId);
@ -90,7 +94,9 @@ export class InMemoryEventTransport implements IEventTransport {
*/
isFirstSubscriber(streamId: string): boolean {
const state = this.streams.get(streamId);
return state?.emitter.listenerCount('chunk') === 1;
const count = state?.emitter.listenerCount('chunk') ?? 0;
logger.debug(`[InMemoryEventTransport] isFirstSubscriber ${streamId}: count=${count}`);
return count === 1;
}
/**

View file

@ -112,6 +112,9 @@ export interface IEventTransport {
/** Get subscriber count for a stream */
getSubscriberCount(streamId: string): number;
/** Check if this is the first subscriber (for ready signaling) */
isFirstSubscriber(streamId: string): boolean;
/** Listen for all subscribers leaving */
onAllSubscribersLeft(streamId: string, callback: () => void): void;
}