diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 8957b041ea..d7b4320c1d 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -158,10 +158,12 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit // conversationId is pre-generated, no need to update from callback }; - // Start background generation - wait for subscriber with timeout fallback + // Start background generation - readyPromise resolves immediately now + // (sync mechanism handles late subscribers) const startGeneration = async () => { try { - await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 3500))]); + // Short timeout as safety net - promise should already be resolved + await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 100))]); } catch (waitError) { logger.warn( `[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`, diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index bb1a268ad0..b3cf9adc46 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -30,10 +30,12 @@ export interface GenerationJobManagerOptions { * Contains AbortController, ready promise, and other non-serializable state. * * @property abortController - Controller to abort the generation - * @property readyPromise - Resolves when first real subscriber connects (used to sync generation start) + * @property readyPromise - Resolves immediately (legacy, kept for API compatibility) * @property resolveReady - Function to resolve readyPromise * @property finalEvent - Cached final event for late subscribers * @property syncSent - Whether sync event was sent (reset when all subscribers leave) + * @property earlyEventBuffer - Buffer for events emitted before first subscriber connects + * @property hasSubscriber - Whether at least one subscriber has connected * @property allSubscribersLeftHandlers - Internal handlers for disconnect events. * These are stored separately from eventTransport subscribers to avoid being counted * in subscriber count. This is critical: if these were registered via subscribe(), @@ -46,6 +48,8 @@ interface RuntimeJobState { resolveReady: () => void; finalEvent?: t.ServerSentEvent; syncSent: boolean; + earlyEventBuffer: t.ServerSentEvent[]; + hasSubscriber: boolean; allSubscribersLeftHandlers?: Array<(...args: unknown[]) => void>; } @@ -193,8 +197,14 @@ class GenerationJobManagerClass { /** * Create runtime state with readyPromise. - * readyPromise is resolved in subscribe() when isFirstSubscriber() returns true. - * This synchronizes generation start with client connection. + * + * With the resumable stream architecture, we no longer need to wait for the + * first subscriber before starting generation: + * - Redis mode: Events are persisted and can be replayed via sync + * - In-memory mode: Content is aggregated and sent via sync on connect + * + * We resolve readyPromise immediately to eliminate startup latency. + * The sync mechanism handles late-connecting clients. */ let resolveReady: () => void; const readyPromise = new Promise((resolve) => { @@ -206,9 +216,14 @@ class GenerationJobManagerClass { readyPromise, resolveReady: resolveReady!, syncSent: false, + earlyEventBuffer: [], + hasSubscriber: false, }; this.runtimeState.set(streamId, runtime); + // Resolve immediately - early event buffer handles late subscribers + resolveReady!(); + /** * Set up all-subscribers-left callback. * When all SSE clients disconnect, this: @@ -487,12 +502,9 @@ class GenerationJobManagerClass { * Subscribe to a job's event stream. * * This is called when an SSE client connects to /chat/stream/:streamId. - * On first subscription, it resolves readyPromise to signal that generation can start. - * - * The subscriber count is critical for the readyPromise mechanism: - * - isFirstSubscriber() returns true when subscriber count is exactly 1 - * - This happens when the first REAL client connects (not internal handlers) - * - Internal allSubscribersLeft handlers are stored separately to avoid being counted + * On first subscription: + * - Resolves readyPromise (legacy, for API compatibility) + * - Replays any buffered early events (e.g., 'created' event) * * @param streamId - The stream to subscribe to * @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.) @@ -536,11 +548,26 @@ class GenerationJobManagerClass { onError, }); - // Signal ready on first subscriber + // Check if this is the first subscriber const isFirst = this.eventTransport.isFirstSubscriber(streamId); - logger.debug( - `[GenerationJobManager] subscribe check: streamId=${streamId}, isFirst=${isFirst}`, - ); + + // First subscriber: replay buffered events and mark as connected + if (!runtime.hasSubscriber) { + runtime.hasSubscriber = true; + + // Replay any events that were emitted before subscriber connected + if (runtime.earlyEventBuffer.length > 0) { + logger.debug( + `[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, + ); + for (const bufferedEvent of runtime.earlyEventBuffer) { + onChunk(bufferedEvent); + } + // Clear buffer after replay + runtime.earlyEventBuffer = []; + } + } + if (isFirst) { runtime.resolveReady(); logger.debug( @@ -554,6 +581,9 @@ class GenerationJobManagerClass { /** * Emit a chunk event to all subscribers. * Uses runtime state check for performance (avoids async job store lookup per token). + * + * If no subscriber has connected yet, buffers the event for replay when they do. + * This ensures early events (like 'created') aren't lost due to race conditions. */ emitChunk(streamId: string, event: t.ServerSentEvent): void { const runtime = this.runtimeState.get(streamId); @@ -585,6 +615,12 @@ class GenerationJobManagerClass { } } + // Buffer early events if no subscriber yet (replay when first subscriber connects) + if (!runtime.hasSubscriber) { + runtime.earlyEventBuffer.push(event); + // Also emit to transport in case subscriber connects mid-flight + } + this.eventTransport.emitChunk(streamId, event); }