diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 90360d8870..7f562d0d6d 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -63,7 +63,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } }); - const job = GenerationJobManager.createJob(streamId, userId, reqConversationId); + const job = await GenerationJobManager.createJob(streamId, userId, reqConversationId); req._resumableStreamId = streamId; // Track if partial response was already saved to avoid duplicates @@ -83,7 +83,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit return; } - const resumeState = GenerationJobManager.getResumeState(streamId); + const resumeState = await GenerationJobManager.getResumeState(streamId); if (!resumeState?.userMessage) { logger.debug('[ResumableAgentController] No user message to save partial response for'); return; @@ -166,7 +166,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit // Start background generation - wait for subscriber with timeout fallback const startGeneration = async () => { try { - await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 2500))]); + await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 3500))]); } catch (waitError) { logger.warn( `[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`, diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 81720e860f..bbac19c562 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -35,11 +35,11 @@ router.use('/', v1); * @description Sends sync event with resume state, replays missed chunks, then streams live * @query resume=true - Indicates this is a reconnection (sends sync event) */ -router.get('/chat/stream/:streamId', (req, res) => { +router.get('/chat/stream/:streamId', async (req, res) => { const { streamId } = req.params; const isResume = req.query.resume === 'true'; - const job = GenerationJobManager.getJob(streamId); + const job = await GenerationJobManager.getJob(streamId); if (!job) { return res.status(404).json({ error: 'Stream not found', @@ -59,7 +59,7 @@ router.get('/chat/stream/:streamId', (req, res) => { // Send sync event with resume state for ALL reconnecting clients // This supports multi-tab scenarios where each tab needs run step data if (isResume) { - const resumeState = GenerationJobManager.getResumeState(streamId); + const resumeState = await GenerationJobManager.getResumeState(streamId); if (resumeState && !res.writableEnded) { // Send sync event with run steps AND aggregatedContent // Client will use aggregatedContent to initialize message state @@ -74,7 +74,7 @@ router.get('/chat/stream/:streamId', (req, res) => { } } - const result = GenerationJobManager.subscribe( + const result = await GenerationJobManager.subscribe( streamId, (event) => { if (!res.writableEnded) { @@ -120,10 +120,10 @@ router.get('/chat/stream/:streamId', (req, res) => { * @access Private * @returns { active, streamId, status, aggregatedContent, createdAt, resumeState } */ -router.get('/chat/status/:conversationId', (req, res) => { +router.get('/chat/status/:conversationId', async (req, res) => { const { conversationId } = req.params; - const job = GenerationJobManager.getJobByConversation(conversationId); + const job = await GenerationJobManager.getJobByConversation(conversationId); if (!job) { return res.json({ active: false }); @@ -133,8 +133,8 @@ router.get('/chat/status/:conversationId', (req, res) => { return res.status(403).json({ error: 'Unauthorized' }); } - const info = GenerationJobManager.getStreamInfo(job.streamId); - const resumeState = GenerationJobManager.getResumeState(job.streamId); + const info = await GenerationJobManager.getStreamInfo(job.streamId); + const resumeState = await GenerationJobManager.getResumeState(job.streamId); res.json({ active: info?.active ?? false, @@ -152,7 +152,7 @@ router.get('/chat/status/:conversationId', (req, res) => { * @access Private * @description Mounted before chatRouter to bypass buildEndpointOption middleware */ -router.post('/chat/abort', (req, res) => { +router.post('/chat/abort', async (req, res) => { logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`); logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`); logger.debug(`[AgentStream] Body:`, req.body); @@ -161,10 +161,10 @@ router.post('/chat/abort', (req, res) => { // Try to find job by streamId first, then by conversationId, then by abortKey let jobStreamId = streamId; - let job = jobStreamId ? GenerationJobManager.getJob(jobStreamId) : null; + let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; if (!job && conversationId) { - job = GenerationJobManager.getJobByConversation(conversationId); + job = await GenerationJobManager.getJobByConversation(conversationId); if (job) { jobStreamId = job.streamId; } @@ -172,14 +172,14 @@ router.post('/chat/abort', (req, res) => { if (!job && abortKey) { jobStreamId = abortKey.split(':')[0]; - job = GenerationJobManager.getJob(jobStreamId); + job = await GenerationJobManager.getJob(jobStreamId); } logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`); if (job && jobStreamId) { logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`); - GenerationJobManager.abortJob(jobStreamId); + await GenerationJobManager.abortJob(jobStreamId); logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`); return res.json({ success: true, aborted: jobStreamId }); } diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 9d5aa28497..8e4f539bee 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1,7 +1,12 @@ import { logger } from '@librechat/data-schemas'; import type { Agents } from 'librechat-data-provider'; import type { StandardGraph } from '@librechat/agents'; -import type { SerializableJobData } from './interfaces/IJobStore'; +import type { + IContentStateManager, + SerializableJobData, + IEventTransport, + IJobStore, +} from './interfaces/IJobStore'; import type * as t from '~/types'; import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; import { InMemoryContentState } from './implementations/InMemoryContentState'; @@ -34,15 +39,14 @@ interface RuntimeJobState { /** * Manages generation jobs for resumable LLM streams. * - * Architecture: Composes three pluggable services for clean separation: + * Architecture: Composes three pluggable services via dependency injection: * - jobStore: Serializable job metadata (InMemory → Redis/KV for horizontal scaling) * - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling) * - contentState: Volatile content refs with WeakRef (always in-memory, not shared) * - * Current implementation uses sync methods for performance. When adding Redis support, - * the manager methods will need to become async, or use a sync-capable Redis client. + * All storage methods are async to support both in-memory and external stores (Redis, etc.). * - * @example Future Redis injection (requires async refactor): + * @example Redis injection: * ```ts * const manager = new GenerationJobManagerClass({ * jobStore: new RedisJobStore(redisClient), @@ -53,11 +57,11 @@ interface RuntimeJobState { */ class GenerationJobManagerClass { /** Job metadata storage - swappable for Redis, KV store, etc. */ - private jobStore: InMemoryJobStore; + private jobStore: IJobStore; /** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */ - private eventTransport: InMemoryEventTransport; + private eventTransport: IEventTransport; /** Volatile content state with WeakRef - always in-memory per instance */ - private contentState: InMemoryContentState; + private contentState: IContentStateManager; /** Runtime state - always in-memory, not serializable */ private runtimeState = new Map(); @@ -106,11 +110,14 @@ class GenerationJobManagerClass { * @param streamId - Unique identifier for this stream * @param userId - User who initiated the request * @param conversationId - Optional conversation ID for lookup - * @returns A facade object compatible with the old GenerationJob interface + * @returns A facade object for the GenerationJob */ - createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob { - // Create serializable job data (sync for in-memory) - const jobData = this.jobStore.createJobSync(streamId, userId, conversationId); + async createJob( + streamId: string, + userId: string, + conversationId?: string, + ): Promise { + const jobData = await this.jobStore.createJob(streamId, userId, conversationId); /** * Create runtime state with readyPromise. @@ -243,8 +250,8 @@ class GenerationJobManagerClass { /** * Get a job by streamId. */ - getJob(streamId: string): t.GenerationJob | undefined { - const jobData = this.jobStore.getJobSync(streamId); + async getJob(streamId: string): Promise { + const jobData = await this.jobStore.getJob(streamId); const runtime = this.runtimeState.get(streamId); if (!jobData || !runtime) { return undefined; @@ -255,8 +262,8 @@ class GenerationJobManagerClass { /** * Find an active job by conversationId. */ - getJobByConversation(conversationId: string): t.GenerationJob | undefined { - const jobData = this.jobStore.getJobByConversationSync(conversationId); + async getJobByConversation(conversationId: string): Promise { + const jobData = await this.jobStore.getJobByConversation(conversationId); if (!jobData) { return undefined; } @@ -270,15 +277,15 @@ class GenerationJobManagerClass { /** * Check if a job exists. */ - hasJob(streamId: string): boolean { - return this.jobStore.hasJobSync(streamId); + async hasJob(streamId: string): Promise { + return this.jobStore.hasJob(streamId); } /** * Get job status. */ - getJobStatus(streamId: string): t.GenerationJobStatus | undefined { - const jobData = this.jobStore.getJobSync(streamId); + async getJobStatus(streamId: string): Promise { + const jobData = await this.jobStore.getJob(streamId); return jobData?.status as t.GenerationJobStatus | undefined; } @@ -302,7 +309,7 @@ class GenerationJobManagerClass { * Abort a job (user-initiated). */ async abortJob(streamId: string): Promise { - const jobData = this.jobStore.getJobSync(streamId); + const jobData = await this.jobStore.getJob(streamId); const runtime = this.runtimeState.get(streamId); if (!jobData) { @@ -376,18 +383,18 @@ class GenerationJobManagerClass { * @param onError - Handler for error events * @returns Subscription object with unsubscribe function, or null if job not found */ - subscribe( + async subscribe( streamId: string, onChunk: t.ChunkHandler, onDone?: t.DoneHandler, onError?: t.ErrorHandler, - ): { unsubscribe: t.UnsubscribeFn } | null { + ): Promise<{ unsubscribe: t.UnsubscribeFn } | null> { const runtime = this.runtimeState.get(streamId); if (!runtime) { return null; } - const jobData = this.jobStore.getJobSync(streamId); + const jobData = await this.jobStore.getJob(streamId); // If job already complete, send final event setImmediate(() => { @@ -429,10 +436,11 @@ class GenerationJobManagerClass { /** * Emit a chunk event to all subscribers. + * Uses runtime state check for performance (avoids async job store lookup per token). */ emitChunk(streamId: string, event: t.ServerSentEvent): void { - const jobData = this.jobStore.getJobSync(streamId); - if (!jobData || jobData.status !== 'running') { + const runtime = this.runtimeState.get(streamId); + if (!runtime || runtime.abortController.signal.aborted) { return; } @@ -494,7 +502,8 @@ class GenerationJobManagerClass { * Set reference to the graph's contentParts array. */ setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { - if (!this.jobStore.hasJobSync(streamId)) { + // Use runtime state check for performance (sync check) + if (!this.runtimeState.has(streamId)) { return; } this.contentState.setContentParts(streamId, contentParts); @@ -505,7 +514,8 @@ class GenerationJobManagerClass { * Set reference to the graph instance. */ setGraph(streamId: string, graph: StandardGraph): void { - if (!this.jobStore.hasJobSync(streamId)) { + // Use runtime state check for performance (sync check) + if (!this.runtimeState.has(streamId)) { return; } this.contentState.setGraph(streamId, graph); @@ -515,8 +525,8 @@ class GenerationJobManagerClass { /** * Get resume state for reconnecting clients. */ - getResumeState(streamId: string): t.ResumeState | null { - const jobData = this.jobStore.getJobSync(streamId); + async getResumeState(streamId: string): Promise { + const jobData = await this.jobStore.getJob(streamId); if (!jobData) { return null; } @@ -583,7 +593,7 @@ class GenerationJobManagerClass { // Cleanup runtime state for deleted jobs for (const streamId of this.runtimeState.keys()) { - if (!this.jobStore.hasJobSync(streamId)) { + if (!(await this.jobStore.hasJob(streamId))) { this.runtimeState.delete(streamId); this.contentState.clearContentState(streamId); this.eventTransport.cleanup(streamId); @@ -598,13 +608,13 @@ class GenerationJobManagerClass { /** * Get stream info for status endpoint. */ - getStreamInfo(streamId: string): { + async getStreamInfo(streamId: string): Promise<{ active: boolean; status: t.GenerationJobStatus; aggregatedContent?: Agents.MessageContentComplex[]; createdAt: number; - } | null { - const jobData = this.jobStore.getJobSync(streamId); + } | null> { + const jobData = await this.jobStore.getJob(streamId); if (!jobData) { return null; } @@ -620,27 +630,33 @@ class GenerationJobManagerClass { /** * Get total job count. */ - getJobCount(): number { + async getJobCount(): Promise { return this.jobStore.getJobCount(); } /** * Get job count by status. */ - getJobCountByStatus(): Record { - return this.jobStore.getJobCountByStatus() as Record; + async getJobCountByStatus(): Promise> { + const [running, complete, error, aborted] = await Promise.all([ + this.jobStore.getJobCountByStatus('running'), + this.jobStore.getJobCountByStatus('complete'), + this.jobStore.getJobCountByStatus('error'), + this.jobStore.getJobCountByStatus('aborted'), + ]); + return { running, complete, error, aborted }; } /** * Destroy the manager. */ - destroy(): void { + async destroy(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } - this.jobStore.destroy(); + await this.jobStore.destroy(); this.eventTransport.destroy(); this.contentState.destroy(); this.runtimeState.clear(); diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 308725e0db..8b8f697ebc 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -10,7 +10,7 @@ export class InMemoryJobStore implements IJobStore { private jobs = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; - /** Time to keep completed jobs before cleanup (5 minutes - reduced from 1 hour) */ + /** Time to keep completed jobs before cleanup (5 minutes) */ private ttlAfterComplete = 300000; /** Maximum number of concurrent jobs */ @@ -25,7 +25,7 @@ export class InMemoryJobStore implements IJobStore { } } - initialize(): void { + async initialize(): Promise { if (this.cleanupInterval) { return; } @@ -46,13 +46,8 @@ export class InMemoryJobStore implements IJobStore { userId: string, conversationId?: string, ): Promise { - return this.createJobSync(streamId, userId, conversationId); - } - - /** Synchronous version for in-memory use */ - createJobSync(streamId: string, userId: string, conversationId?: string): SerializableJobData { if (this.jobs.size >= this.maxJobs) { - this.evictOldestSync(); + await this.evictOldest(); } const job: SerializableJobData = { @@ -71,20 +66,10 @@ export class InMemoryJobStore implements IJobStore { } async getJob(streamId: string): Promise { - return this.getJobSync(streamId); - } - - /** Synchronous version for in-memory use */ - getJobSync(streamId: string): SerializableJobData | null { return this.jobs.get(streamId) ?? null; } async getJobByConversation(conversationId: string): Promise { - return this.getJobByConversationSync(conversationId); - } - - /** Synchronous version for in-memory use */ - getJobByConversationSync(conversationId: string): SerializableJobData | null { // Direct match first (streamId === conversationId for existing conversations) const directMatch = this.jobs.get(conversationId); if (directMatch && directMatch.status === 'running') { @@ -102,11 +87,6 @@ export class InMemoryJobStore implements IJobStore { } async updateJob(streamId: string, updates: Partial): Promise { - this.updateJobSync(streamId, updates); - } - - /** Synchronous version for in-memory use */ - updateJobSync(streamId: string, updates: Partial): void { const job = this.jobs.get(streamId); if (!job) { return; @@ -115,21 +95,11 @@ export class InMemoryJobStore implements IJobStore { } async deleteJob(streamId: string): Promise { - this.deleteJobSync(streamId); - } - - /** Synchronous version for in-memory use */ - deleteJobSync(streamId: string): void { this.jobs.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); } async hasJob(streamId: string): Promise { - return this.hasJobSync(streamId); - } - - /** Synchronous version for in-memory use */ - hasJobSync(streamId: string): boolean { return this.jobs.has(streamId); } @@ -166,11 +136,6 @@ export class InMemoryJobStore implements IJobStore { } private async evictOldest(): Promise { - this.evictOldestSync(); - } - - /** Synchronous version for in-memory use */ - private evictOldestSync(): void { let oldestId: string | null = null; let oldestTime = Infinity; @@ -183,32 +148,27 @@ export class InMemoryJobStore implements IJobStore { if (oldestId) { logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`); - this.deleteJobSync(oldestId); + await this.deleteJob(oldestId); } } /** Get job count (for monitoring) */ - getJobCount(): number { + async getJobCount(): Promise { return this.jobs.size; } /** Get job count by status (for monitoring) */ - getJobCountByStatus(): Record { - const counts: Record = { - running: 0, - complete: 0, - error: 0, - aborted: 0, - }; - + async getJobCountByStatus(status: JobStatus): Promise { + let count = 0; for (const job of this.jobs.values()) { - counts[job.status]++; + if (job.status === status) { + count++; + } } - - return counts; + return count; } - destroy(): void { + async destroy(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 2b2a8800a5..7663f7c4b7 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -56,6 +56,9 @@ export interface ResumeState { * Implementations can use in-memory Map, Redis, KV store, etc. */ export interface IJobStore { + /** Initialize the store (e.g., connect to Redis, start cleanup intervals) */ + initialize(): Promise; + /** Create a new job */ createJob( streamId: string, @@ -83,6 +86,15 @@ export interface IJobStore { /** Cleanup expired jobs */ cleanup(): Promise; + + /** Get total job count */ + getJobCount(): Promise; + + /** Get job count by status */ + getJobCountByStatus(status: JobStatus): Promise; + + /** Destroy the store and release resources */ + destroy(): Promise; } /** @@ -117,6 +129,12 @@ export interface IEventTransport { /** Listen for all subscribers leaving */ onAllSubscribersLeft(streamId: string, callback: () => void): void; + + /** Cleanup transport resources for a specific stream */ + cleanup(streamId: string): void; + + /** Destroy all transport resources */ + destroy(): void; } /** @@ -139,4 +157,7 @@ export interface IContentStateManager { /** Clear content state for a job */ clearContentState(streamId: string): void; + + /** Destroy all content state resources */ + destroy(): void; }