refactor: Update GenerationJobManager and ResumableAgentController for improved event handling

- Modified GenerationJobManager to resolve readyPromise immediately, eliminating startup latency and allowing early event buffering for late subscribers.
- Enhanced event handling logic to replay buffered events when the first subscriber connects, ensuring no events are lost due to race conditions.
- Updated comments for clarity on the new event synchronization mechanism and its benefits in both Redis and in-memory modes.
This commit is contained in:
Danny Avila 2025-12-15 10:40:28 -05:00
parent f82483c6b6
commit 8151966e2f
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
2 changed files with 53 additions and 15 deletions

View file

@ -158,10 +158,12 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
// conversationId is pre-generated, no need to update from callback
};
// Start background generation - wait for subscriber with timeout fallback
// Start background generation - readyPromise resolves immediately now
// (sync mechanism handles late subscribers)
const startGeneration = async () => {
try {
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 3500))]);
// Short timeout as safety net - promise should already be resolved
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 100))]);
} catch (waitError) {
logger.warn(
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,

View file

@ -30,10 +30,12 @@ export interface GenerationJobManagerOptions {
* Contains AbortController, ready promise, and other non-serializable state.
*
* @property abortController - Controller to abort the generation
* @property readyPromise - Resolves when first real subscriber connects (used to sync generation start)
* @property readyPromise - Resolves immediately (legacy, kept for API compatibility)
* @property resolveReady - Function to resolve readyPromise
* @property finalEvent - Cached final event for late subscribers
* @property syncSent - Whether sync event was sent (reset when all subscribers leave)
* @property earlyEventBuffer - Buffer for events emitted before first subscriber connects
* @property hasSubscriber - Whether at least one subscriber has connected
* @property allSubscribersLeftHandlers - Internal handlers for disconnect events.
* These are stored separately from eventTransport subscribers to avoid being counted
* in subscriber count. This is critical: if these were registered via subscribe(),
@ -46,6 +48,8 @@ interface RuntimeJobState {
resolveReady: () => void;
finalEvent?: t.ServerSentEvent;
syncSent: boolean;
earlyEventBuffer: t.ServerSentEvent[];
hasSubscriber: boolean;
allSubscribersLeftHandlers?: Array<(...args: unknown[]) => void>;
}
@ -193,8 +197,14 @@ class GenerationJobManagerClass {
/**
* Create runtime state with readyPromise.
* readyPromise is resolved in subscribe() when isFirstSubscriber() returns true.
* This synchronizes generation start with client connection.
*
* With the resumable stream architecture, we no longer need to wait for the
* first subscriber before starting generation:
* - Redis mode: Events are persisted and can be replayed via sync
* - In-memory mode: Content is aggregated and sent via sync on connect
*
* We resolve readyPromise immediately to eliminate startup latency.
* The sync mechanism handles late-connecting clients.
*/
let resolveReady: () => void;
const readyPromise = new Promise<void>((resolve) => {
@ -206,9 +216,14 @@ class GenerationJobManagerClass {
readyPromise,
resolveReady: resolveReady!,
syncSent: false,
earlyEventBuffer: [],
hasSubscriber: false,
};
this.runtimeState.set(streamId, runtime);
// Resolve immediately - early event buffer handles late subscribers
resolveReady!();
/**
* Set up all-subscribers-left callback.
* When all SSE clients disconnect, this:
@ -487,12 +502,9 @@ class GenerationJobManagerClass {
* Subscribe to a job's event stream.
*
* This is called when an SSE client connects to /chat/stream/:streamId.
* On first subscription, it resolves readyPromise to signal that generation can start.
*
* The subscriber count is critical for the readyPromise mechanism:
* - isFirstSubscriber() returns true when subscriber count is exactly 1
* - This happens when the first REAL client connects (not internal handlers)
* - Internal allSubscribersLeft handlers are stored separately to avoid being counted
* On first subscription:
* - Resolves readyPromise (legacy, for API compatibility)
* - Replays any buffered early events (e.g., 'created' event)
*
* @param streamId - The stream to subscribe to
* @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.)
@ -536,11 +548,26 @@ class GenerationJobManagerClass {
onError,
});
// Signal ready on first subscriber
// Check if this is the first subscriber
const isFirst = this.eventTransport.isFirstSubscriber(streamId);
// First subscriber: replay buffered events and mark as connected
if (!runtime.hasSubscriber) {
runtime.hasSubscriber = true;
// Replay any events that were emitted before subscriber connected
if (runtime.earlyEventBuffer.length > 0) {
logger.debug(
`[GenerationJobManager] subscribe check: streamId=${streamId}, isFirst=${isFirst}`,
`[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`,
);
for (const bufferedEvent of runtime.earlyEventBuffer) {
onChunk(bufferedEvent);
}
// Clear buffer after replay
runtime.earlyEventBuffer = [];
}
}
if (isFirst) {
runtime.resolveReady();
logger.debug(
@ -554,6 +581,9 @@ class GenerationJobManagerClass {
/**
* Emit a chunk event to all subscribers.
* Uses runtime state check for performance (avoids async job store lookup per token).
*
* If no subscriber has connected yet, buffers the event for replay when they do.
* This ensures early events (like 'created') aren't lost due to race conditions.
*/
emitChunk(streamId: string, event: t.ServerSentEvent): void {
const runtime = this.runtimeState.get(streamId);
@ -585,6 +615,12 @@ class GenerationJobManagerClass {
}
}
// Buffer early events if no subscriber yet (replay when first subscriber connects)
if (!runtime.hasSubscriber) {
runtime.earlyEventBuffer.push(event);
// Also emit to transport in case subscriber connects mid-flight
}
this.eventTransport.emitChunk(streamId, event);
}