diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 2e8f9bd18d..079ac4cd09 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -177,10 +177,16 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit const onStart = (userMsg, respMsgId, _isNewConvo) => { userMessage = userMsg; - // Store the response messageId upfront so partial saves use the same ID - if (respMsgId) { - GenerationJobManager.updateMetadata(streamId, { responseMessageId: respMsgId }); - } + // Store userMessage and responseMessageId upfront for resume capability + GenerationJobManager.updateMetadata(streamId, { + responseMessageId: respMsgId, + userMessage: { + messageId: userMsg.messageId, + parentMessageId: userMsg.parentMessageId, + conversationId: userMsg.conversationId, + text: userMsg.text, + }, + }); GenerationJobManager.emitChunk(streamId, { created: true, diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 36d293afad..81720e860f 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -118,7 +118,7 @@ router.get('/chat/stream/:streamId', (req, res) => { * @route GET /chat/status/:conversationId * @desc Check if there's an active generation job for a conversation * @access Private - * @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt, resumeState } + * @returns { active, streamId, status, aggregatedContent, createdAt, resumeState } */ router.get('/chat/status/:conversationId', (req, res) => { const { conversationId } = req.params; @@ -140,8 +140,6 @@ router.get('/chat/status/:conversationId', (req, res) => { active: info?.active ?? false, streamId: job.streamId, status: info?.status ?? job.status, - chunkCount: info?.chunkCount ?? 0, - runStepCount: info?.runStepCount ?? 0, aggregatedContent: info?.aggregatedContent, createdAt: info?.createdAt ?? job.createdAt, resumeState, diff --git a/client/src/data-provider/SSE/queries.ts b/client/src/data-provider/SSE/queries.ts index 45bc6cacae..72f70a10ab 100644 --- a/client/src/data-provider/SSE/queries.ts +++ b/client/src/data-provider/SSE/queries.ts @@ -6,7 +6,6 @@ export interface StreamStatusResponse { active: boolean; streamId?: string; status?: 'running' | 'complete' | 'error' | 'aborted'; - chunkCount?: number; aggregatedContent?: Array<{ type: string; text?: string }>; createdAt?: number; resumeState?: Agents.ResumeState; diff --git a/client/src/hooks/SSE/useResumeOnLoad.ts b/client/src/hooks/SSE/useResumeOnLoad.ts index abf0c7eda8..5a674cec75 100644 --- a/client/src/hooks/SSE/useResumeOnLoad.ts +++ b/client/src/hooks/SSE/useResumeOnLoad.ts @@ -62,7 +62,7 @@ function buildSubmissionFromResumeState( content: (resumeState.aggregatedContent as TMessage['content']) ?? [], isCreatedByUser: false, role: 'assistant', - sender: existingResponseMessage?.sender, + sender: existingResponseMessage?.sender ?? resumeState.sender, model: existingResponseMessage?.model, } as TMessage; diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 38a76e3625..f592662b39 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1,21 +1,46 @@ -import { EventEmitter } from 'events'; import { logger } from '@librechat/data-schemas'; import type { Agents } from 'librechat-data-provider'; import type { StandardGraph } from '@librechat/agents'; +import type { SerializableJobData } from './interfaces/IJobStore'; import type * as t from '~/types'; +import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; +import { InMemoryContentState } from './implementations/InMemoryContentState'; +import { InMemoryJobStore } from './implementations/InMemoryJobStore'; + +/** + * Runtime state for active jobs - not serializable, kept in-memory per instance. + * Contains AbortController, ready promise, and other non-serializable state. + */ +interface RuntimeJobState { + abortController: AbortController; + readyPromise: Promise; + resolveReady: () => void; + finalEvent?: t.ServerSentEvent; + syncSent: boolean; +} /** * Manages generation jobs for resumable LLM streams. - * Generation runs independently of HTTP connections via EventEmitter. - * Clients can subscribe/unsubscribe to job events without affecting generation. + * Composes three implementations for clean separation of concerns: + * - InMemoryJobStore: Serializable job metadata (swappable for Redis) + * - InMemoryEventTransport: Pub/sub events (swappable for Redis Pub/Sub) + * - InMemoryContentState: Volatile content refs with WeakRef (always in-memory) */ class GenerationJobManagerClass { - private jobs = new Map(); + private jobStore: InMemoryJobStore; + private eventTransport: InMemoryEventTransport; + private contentState: InMemoryContentState; + + /** Runtime state - always in-memory, not serializable */ + private runtimeState = new Map(); + private cleanupInterval: NodeJS.Timeout | null = null; - /** Time to keep completed jobs before cleanup (1 hour) */ - private ttlAfterComplete = 3600000; - /** Maximum number of concurrent jobs */ - private maxJobs = 1000; + + constructor() { + this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); + this.eventTransport = new InMemoryEventTransport(); + this.contentState = new InMemoryContentState(); + } /** * Initialize the job manager with periodic cleanup. @@ -25,6 +50,8 @@ class GenerationJobManagerClass { return; } + this.jobStore.initialize(); + this.cleanupInterval = setInterval(() => { this.cleanup(); }, 60000); @@ -33,185 +60,231 @@ class GenerationJobManagerClass { this.cleanupInterval.unref(); } - logger.debug('[GenerationJobManager] Initialized with cleanup interval'); + logger.debug('[GenerationJobManager] Initialized'); } /** * Create a new generation job. - * @param streamId - Unique identifier for the stream - * @param userId - User ID who initiated the generation - * @param conversationId - Optional conversation ID - * @returns The created job + * @returns A facade object compatible with the old GenerationJob interface */ createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob { - if (this.jobs.size >= this.maxJobs) { - this.evictOldest(); - } + // Create serializable job data (sync for in-memory) + const jobData = this.jobStore.createJobSync(streamId, userId, conversationId); + // Create runtime state let resolveReady: () => void; const readyPromise = new Promise((resolve) => { resolveReady = resolve; }); - const job: t.GenerationJob = { - streamId, - emitter: new EventEmitter(), - status: 'running', - createdAt: Date.now(), + const runtime: RuntimeJobState = { abortController: new AbortController(), - metadata: { userId, conversationId }, readyPromise, resolveReady: resolveReady!, - chunks: [], + syncSent: false, }; + this.runtimeState.set(streamId, runtime); - job.emitter.setMaxListeners(100); + // Set up all-subscribers-left callback + this.eventTransport.onAllSubscribersLeft(streamId, () => { + const currentRuntime = this.runtimeState.get(streamId); + if (currentRuntime) { + currentRuntime.syncSent = false; + } + const content = this.contentState.getContentParts(streamId) ?? []; + this.eventTransport.emitChunk(streamId, { + _internal: 'allSubscribersLeft', + content, + }); + logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`); + }); - this.jobs.set(streamId, job); logger.debug(`[GenerationJobManager] Created job: ${streamId}`); - return job; + // Return facade for backwards compatibility + return this.buildJobFacade(streamId, jobData, runtime); + } + + /** + * Build a GenerationJob facade from job data and runtime state. + * This maintains backwards compatibility with existing code. + */ + private buildJobFacade( + streamId: string, + jobData: SerializableJobData, + runtime: RuntimeJobState, + ): t.GenerationJob { + // Create a proxy emitter that delegates to eventTransport + const emitterProxy = { + on: (event: string, handler: (...args: unknown[]) => void) => { + if (event === 'allSubscribersLeft') { + // Subscribe to internal event + this.eventTransport.subscribe(streamId, { + onChunk: (e) => { + const evt = e as Record; + if (evt._internal === 'allSubscribersLeft') { + handler(evt.content); + } + }, + }); + } + }, + emit: () => { + /* handled via eventTransport */ + }, + listenerCount: () => this.eventTransport.getSubscriberCount(streamId), + setMaxListeners: () => { + /* no-op for proxy */ + }, + removeAllListeners: () => this.eventTransport.cleanup(streamId), + off: () => { + /* handled via unsubscribe */ + }, + }; + + return { + streamId, + emitter: emitterProxy as unknown as t.GenerationJob['emitter'], + status: jobData.status as t.GenerationJobStatus, + createdAt: jobData.createdAt, + completedAt: jobData.completedAt, + abortController: runtime.abortController, + error: jobData.error, + metadata: { + userId: jobData.userId, + conversationId: jobData.conversationId, + userMessage: jobData.userMessage, + responseMessageId: jobData.responseMessageId, + sender: jobData.sender, + }, + readyPromise: runtime.readyPromise, + resolveReady: runtime.resolveReady, + finalEvent: runtime.finalEvent, + syncSent: runtime.syncSent, + }; } /** * Get a job by streamId. - * @param streamId - The stream identifier - * @returns The job if found, undefined otherwise */ getJob(streamId: string): t.GenerationJob | undefined { - return this.jobs.get(streamId); + const jobData = this.jobStore.getJobSync(streamId); + const runtime = this.runtimeState.get(streamId); + if (!jobData || !runtime) { + return undefined; + } + return this.buildJobFacade(streamId, jobData, runtime); } /** * Find an active job by conversationId. - * Since streamId === conversationId for existing conversations, - * we first check by streamId, then search metadata. - * @param conversationId - The conversation identifier - * @returns The job if found, undefined otherwise */ getJobByConversation(conversationId: string): t.GenerationJob | undefined { - const directMatch = this.jobs.get(conversationId); - if (directMatch && directMatch.status === 'running') { - return directMatch; + const jobData = this.jobStore.getJobByConversationSync(conversationId); + if (!jobData) { + return undefined; } - - for (const job of this.jobs.values()) { - if (job.metadata.conversationId === conversationId && job.status === 'running') { - return job; - } + const runtime = this.runtimeState.get(jobData.streamId); + if (!runtime) { + return undefined; } - - return undefined; + return this.buildJobFacade(jobData.streamId, jobData, runtime); } /** * Check if a job exists. - * @param streamId - The stream identifier - * @returns True if job exists */ hasJob(streamId: string): boolean { - return this.jobs.has(streamId); + return this.jobStore.hasJobSync(streamId); } /** * Get job status. - * @param streamId - The stream identifier - * @returns The job status or undefined if not found */ getJobStatus(streamId: string): t.GenerationJobStatus | undefined { - return this.jobs.get(streamId)?.status; + const jobData = this.jobStore.getJobSync(streamId); + return jobData?.status as t.GenerationJobStatus | undefined; } /** * Mark job as complete. - * @param streamId - The stream identifier - * @param error - Optional error message if job failed */ - completeJob(streamId: string, error?: string): void { - const job = this.jobs.get(streamId); - if (!job) { - return; - } + async completeJob(streamId: string, error?: string): Promise { + await this.jobStore.updateJob(streamId, { + status: error ? 'error' : 'complete', + completedAt: Date.now(), + error, + }); - job.status = error ? 'error' : 'complete'; - job.completedAt = Date.now(); - if (error) { - job.error = error; - } + // Clear content state + this.contentState.clearContentState(streamId); - logger.debug(`[GenerationJobManager] Job completed: ${streamId}, status: ${job.status}`); + logger.debug(`[GenerationJobManager] Job completed: ${streamId}`); } /** * Abort a job (user-initiated). - * Emits both error event and a final done event with aborted flag. - * @param streamId - The stream identifier */ - abortJob(streamId: string): void { - const job = this.jobs.get(streamId); - if (!job) { + async abortJob(streamId: string): Promise { + const jobData = this.jobStore.getJobSync(streamId); + const runtime = this.runtimeState.get(streamId); + + if (!jobData) { logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`); return; } - logger.debug( - `[GenerationJobManager] Aborting job ${streamId}, signal already aborted: ${job.abortController.signal.aborted}`, - ); - job.abortController.abort(); - job.status = 'aborted'; - job.completedAt = Date.now(); - logger.debug( - `[GenerationJobManager] AbortController.abort() called for ${streamId}, signal.aborted: ${job.abortController.signal.aborted}`, - ); + if (runtime) { + runtime.abortController.abort(); + } - // Create a final event for abort so clients can properly handle UI cleanup - const userMessageId = job.metadata.userMessage?.messageId; - const abortFinalEvent = { + await this.jobStore.updateJob(streamId, { + status: 'aborted', + completedAt: Date.now(), + }); + + // Create final event for abort + const userMessageId = jobData.userMessage?.messageId; + const content = this.contentState.getContentParts(streamId) ?? []; + + const abortFinalEvent: t.ServerSentEvent = { final: true, - conversation: { - conversationId: job.metadata.conversationId, - }, + conversation: { conversationId: jobData.conversationId }, title: 'New Chat', - requestMessage: job.metadata.userMessage + requestMessage: jobData.userMessage ? { messageId: userMessageId, - parentMessageId: job.metadata.userMessage.parentMessageId, - conversationId: job.metadata.conversationId, - text: job.metadata.userMessage.text ?? '', + parentMessageId: jobData.userMessage.parentMessageId, + conversationId: jobData.conversationId, + text: jobData.userMessage.text ?? '', isCreatedByUser: true, } : null, responseMessage: { - messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`, - parentMessageId: userMessageId, // Link response to user message - conversationId: job.metadata.conversationId, - content: job.contentPartsRef ?? [], - sender: job.metadata.sender ?? 'AI', + messageId: jobData.responseMessageId ?? `${userMessageId ?? 'aborted'}_`, + parentMessageId: userMessageId, + conversationId: jobData.conversationId, + content, + sender: jobData.sender ?? 'AI', unfinished: true, - /** Not an error - the job was intentionally aborted */ error: false, isCreatedByUser: false, }, aborted: true, } as unknown as t.ServerSentEvent; - job.finalEvent = abortFinalEvent; - job.emitter.emit('done', abortFinalEvent); - // Don't emit error event - it causes unhandled error warnings - // The done event with error:true and aborted:true is sufficient + if (runtime) { + runtime.finalEvent = abortFinalEvent; + } + + this.eventTransport.emitDone(streamId, abortFinalEvent); + this.contentState.clearContentState(streamId); logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); } /** - * Subscribe to a job's event stream with replay support. - * Replays any chunks buffered during disconnect, then continues with live events. - * Buffer is cleared after replay (only holds chunks missed during disconnect). - * @param streamId - The stream identifier - * @param onChunk - Handler for chunk events - * @param onDone - Optional handler for completion - * @param onError - Optional handler for errors - * @returns Object with unsubscribe function, or null if job not found + * Subscribe to a job's event stream. */ subscribe( streamId: string, @@ -219,352 +292,263 @@ class GenerationJobManagerClass { onDone?: t.DoneHandler, onError?: t.ErrorHandler, ): { unsubscribe: t.UnsubscribeFn } | null { - const job = this.jobs.get(streamId); - if (!job) { + const runtime = this.runtimeState.get(streamId); + if (!runtime) { return null; } - // Use setImmediate to allow the caller to set up their connection first + const jobData = this.jobStore.getJobSync(streamId); + + // If job already complete, send final event setImmediate(() => { - // If job is already complete, send the final event - if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) { - onDone?.(job.finalEvent); + if ( + runtime.finalEvent && + jobData && + ['complete', 'error', 'aborted'].includes(jobData.status) + ) { + onDone?.(runtime.finalEvent); } }); - const chunkHandler = (event: t.ServerSentEvent) => onChunk(event); - const doneHandler = (event: t.ServerSentEvent) => onDone?.(event); - const errorHandler = (error: string) => onError?.(error); + const subscription = this.eventTransport.subscribe(streamId, { + onChunk: (event) => { + const e = event as t.ServerSentEvent; + // Filter out internal events + if (!(e as Record)._internal) { + onChunk(e); + } + }, + onDone: (event) => onDone?.(event as t.ServerSentEvent), + onError, + }); - job.emitter.on('chunk', chunkHandler); - job.emitter.on('done', doneHandler); - job.emitter.on('error', errorHandler); - - // Signal that we're ready to receive events (first subscriber) - if (job.emitter.listenerCount('chunk') === 1) { - job.resolveReady(); + // Signal ready on first subscriber + if (this.eventTransport.isFirstSubscriber(streamId)) { + runtime.resolveReady(); logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`); } - const unsubscribe = () => { - const currentJob = this.jobs.get(streamId); - if (currentJob) { - currentJob.emitter.off('chunk', chunkHandler); - currentJob.emitter.off('done', doneHandler); - currentJob.emitter.off('error', errorHandler); - - // When last subscriber leaves - if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') { - // Reset syncSent so reconnecting clients get sync event again - currentJob.syncSent = false; - // Emit event for saving partial response - use graph's contentParts directly - currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []); - logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`); - } - } - }; - - return { unsubscribe }; + return subscription; } /** * Emit a chunk event to all subscribers. - * Only buffers chunks when no subscribers are listening (for reconnect replay). - * Also tracks run steps and user message for reconnection state. - * @param streamId - The stream identifier - * @param event - The event data to emit */ emitChunk(streamId: string, event: t.ServerSentEvent): void { - const job = this.jobs.get(streamId); - if (!job || job.status !== 'running') { + const jobData = this.jobStore.getJobSync(streamId); + if (!jobData || jobData.status !== 'running') { return; } - // // Only buffer if no one is listening (for reconnect replay) - // const hasSubscribers = job.emitter.listenerCount('chunk') > 0; - // if (!hasSubscribers) { - // job.chunks.push(event); - // } - // Track user message from created event - this.trackUserMessage(job, event); + this.trackUserMessage(streamId, event); - // Run steps and content are tracked via graphRef and contentPartsRef - // No need to aggregate separately - these reference the graph's data directly - - job.emitter.emit('chunk', event); + this.eventTransport.emitChunk(streamId, event); } /** - * Track user message from created event for reconnection. + * Track user message from created event. */ - private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void { + private trackUserMessage(streamId: string, event: t.ServerSentEvent): void { const data = event as Record; if (!data.created || !data.message) { return; } const message = data.message as Record; - job.metadata.userMessage = { - messageId: message.messageId as string, - parentMessageId: message.parentMessageId as string | undefined, - conversationId: message.conversationId as string | undefined, - text: message.text as string | undefined, + const updates: Partial = { + userMessage: { + messageId: message.messageId as string, + parentMessageId: message.parentMessageId as string | undefined, + conversationId: message.conversationId as string | undefined, + text: message.text as string | undefined, + }, }; - // Update conversationId in metadata if not set - if (!job.metadata.conversationId && message.conversationId) { - job.metadata.conversationId = message.conversationId as string; + if (message.conversationId) { + updates.conversationId = message.conversationId as string; } - logger.debug(`[GenerationJobManager] Tracked user message for ${job.streamId}`); + this.jobStore.updateJob(streamId, updates); + logger.debug(`[GenerationJobManager] Tracked user message for ${streamId}`); } /** - * Update job metadata with additional information. - * Called when more information becomes available during generation. - * @param streamId - The stream identifier - * @param metadata - Partial metadata to merge + * Update job metadata. */ updateMetadata(streamId: string, metadata: Partial): void { - const job = this.jobs.get(streamId); - if (!job) { - return; + const updates: Partial = {}; + if (metadata.responseMessageId) { + updates.responseMessageId = metadata.responseMessageId; } - job.metadata = { ...job.metadata, ...metadata }; + if (metadata.sender) { + updates.sender = metadata.sender; + } + if (metadata.conversationId) { + updates.conversationId = metadata.conversationId; + } + if (metadata.userMessage) { + updates.userMessage = metadata.userMessage; + } + this.jobStore.updateJob(streamId, updates); logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`); } /** * Set reference to the graph's contentParts array. - * This is the authoritative content source - no need to aggregate separately. - * @param streamId - The stream identifier - * @param contentParts - Reference to graph's contentParts array */ setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { - const job = this.jobs.get(streamId); - if (!job) { + if (!this.jobStore.hasJobSync(streamId)) { return; } - job.contentPartsRef = contentParts; - logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, { - initialLength: contentParts?.length ?? 0, - isArray: Array.isArray(contentParts), - }); + this.contentState.setContentParts(streamId, contentParts); + logger.debug(`[GenerationJobManager] Set contentParts for ${streamId}`); } /** * Set reference to the graph instance. - * This provides access to run steps (contentData) - no need to track separately. - * @param streamId - The stream identifier - * @param graph - Reference to the graph instance (must have contentData property) */ setGraph(streamId: string, graph: StandardGraph): void { - const job = this.jobs.get(streamId); - if (!job) { + if (!this.jobStore.hasJobSync(streamId)) { return; } - job.graphRef = graph; + this.contentState.setGraph(streamId, graph); logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`); } /** * Get resume state for reconnecting clients. - * Includes run steps, aggregated content, and user message data. - * @param streamId - The stream identifier - * @returns Resume state or null if job not found */ getResumeState(streamId: string): t.ResumeState | null { - const job = this.jobs.get(streamId); - if (!job) { + const jobData = this.jobStore.getJobSync(streamId); + if (!jobData) { return null; } - // Use graph's contentParts directly - it's always current and complete - // No conversion needed - send as-is - const aggregatedContent = job.contentPartsRef ?? []; - - // Use graph's contentData for run steps - it's the authoritative source - const runSteps = job.graphRef?.contentData ?? []; + const aggregatedContent = this.contentState.getContentParts(streamId) ?? []; + const runSteps = this.contentState.getRunSteps(streamId); logger.debug(`[GenerationJobManager] getResumeState:`, { streamId, aggregatedContentLength: aggregatedContent.length, runStepsLength: runSteps.length, - hasGraphRef: !!job.graphRef, - hasContentPartsRef: !!job.contentPartsRef, }); return { runSteps, aggregatedContent, - userMessage: job.metadata.userMessage, - responseMessageId: job.metadata.responseMessageId, - conversationId: job.metadata.conversationId, + userMessage: jobData.userMessage, + responseMessageId: jobData.responseMessageId, + conversationId: jobData.conversationId, + sender: jobData.sender, }; } /** - * Mark that sync has been sent for this job to prevent duplicate replays. - * @param streamId - The stream identifier + * Mark that sync has been sent. */ markSyncSent(streamId: string): void { - const job = this.jobs.get(streamId); - if (job) { - job.syncSent = true; + const runtime = this.runtimeState.get(streamId); + if (runtime) { + runtime.syncSent = true; } } /** - * Check if sync has been sent for this job. - * @param streamId - The stream identifier + * Check if sync has been sent. */ wasSyncSent(streamId: string): boolean { - return this.jobs.get(streamId)?.syncSent ?? false; + return this.runtimeState.get(streamId)?.syncSent ?? false; } /** - * Emit a done event to all subscribers. - * Stores the final event for replay on reconnect. - * @param streamId - The stream identifier - * @param event - The final event data + * Emit a done event. */ emitDone(streamId: string, event: t.ServerSentEvent): void { - const job = this.jobs.get(streamId); - if (!job) { - return; + const runtime = this.runtimeState.get(streamId); + if (runtime) { + runtime.finalEvent = event; } - job.finalEvent = event; - job.emitter.emit('done', event); + this.eventTransport.emitDone(streamId, event); } /** - * Emit an error event to all subscribers. - * @param streamId - The stream identifier - * @param error - The error message + * Emit an error event. */ emitError(streamId: string, error: string): void { - const job = this.jobs.get(streamId); - if (!job) { - return; - } - job.emitter.emit('error', error); + this.eventTransport.emitError(streamId, error); } /** - * Cleanup completed jobs after TTL. + * Cleanup expired jobs. */ - private cleanup(): void { - const now = Date.now(); - const toDelete: string[] = []; + private async cleanup(): Promise { + const count = await this.jobStore.cleanup(); - for (const [streamId, job] of this.jobs) { - const isFinished = ['complete', 'error', 'aborted'].includes(job.status); - if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) { - toDelete.push(streamId); + // Cleanup runtime state for deleted jobs + for (const streamId of this.runtimeState.keys()) { + if (!this.jobStore.hasJobSync(streamId)) { + this.runtimeState.delete(streamId); + this.contentState.clearContentState(streamId); + this.eventTransport.cleanup(streamId); } } - toDelete.forEach((id) => this.deleteJob(id)); - - if (toDelete.length > 0) { - logger.debug(`[GenerationJobManager] Cleaned up ${toDelete.length} expired jobs`); - } - } - - /** - * Delete a job and cleanup listeners. - * @param streamId - The stream identifier - */ - private deleteJob(streamId: string): void { - const job = this.jobs.get(streamId); - if (job) { - job.emitter.removeAllListeners(); - this.jobs.delete(streamId); - } - } - - /** - * Evict oldest job (LRU). - */ - private evictOldest(): void { - let oldestId: string | null = null; - let oldestTime = Infinity; - - for (const [streamId, job] of this.jobs) { - if (job.createdAt < oldestTime) { - oldestTime = job.createdAt; - oldestId = streamId; - } - } - - if (oldestId) { - logger.warn(`[GenerationJobManager] Evicting oldest job: ${oldestId}`); - this.deleteJob(oldestId); + if (count > 0) { + logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`); } } /** * Get stream info for status endpoint. - * Returns chunk count, status, aggregated content, and run step count. */ getStreamInfo(streamId: string): { active: boolean; status: t.GenerationJobStatus; - chunkCount: number; - runStepCount: number; aggregatedContent?: Agents.MessageContentComplex[]; createdAt: number; } | null { - const job = this.jobs.get(streamId); - if (!job) { + const jobData = this.jobStore.getJobSync(streamId); + if (!jobData) { return null; } return { - active: job.status === 'running', - status: job.status, - chunkCount: job.chunks.length, - runStepCount: job.graphRef?.contentData?.length ?? 0, - aggregatedContent: job.contentPartsRef ?? [], - createdAt: job.createdAt, + active: jobData.status === 'running', + status: jobData.status as t.GenerationJobStatus, + aggregatedContent: this.contentState.getContentParts(streamId) ?? [], + createdAt: jobData.createdAt, }; } /** - * Get total number of active jobs. + * Get total job count. */ getJobCount(): number { - return this.jobs.size; + return this.jobStore.getJobCount(); } /** - * Get count of jobs by status. + * Get job count by status. */ getJobCountByStatus(): Record { - const counts: Record = { - running: 0, - complete: 0, - error: 0, - aborted: 0, - }; - - for (const job of this.jobs.values()) { - counts[job.status]++; - } - - return counts; + return this.jobStore.getJobCountByStatus() as Record; } /** - * Destroy the manager and cleanup all jobs. + * Destroy the manager. */ destroy(): void { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } - this.jobs.forEach((_, streamId) => this.deleteJob(streamId)); + + this.jobStore.destroy(); + this.eventTransport.destroy(); + this.contentState.destroy(); + this.runtimeState.clear(); + logger.debug('[GenerationJobManager] Destroyed'); } } diff --git a/packages/api/src/stream/implementations/InMemoryContentState.ts b/packages/api/src/stream/implementations/InMemoryContentState.ts new file mode 100644 index 0000000000..29852458ab --- /dev/null +++ b/packages/api/src/stream/implementations/InMemoryContentState.ts @@ -0,0 +1,107 @@ +import type { Agents } from 'librechat-data-provider'; +import type { StandardGraph } from '@librechat/agents'; +import type { IContentStateManager } from '../interfaces/IJobStore'; + +/** + * Content state entry - volatile, in-memory only. + * Uses WeakRef to allow garbage collection of graph when no longer needed. + */ +interface ContentState { + contentParts: Agents.MessageContentComplex[]; + graphRef: WeakRef | null; +} + +/** + * In-memory content state manager. + * Manages volatile references to graph content that should NOT be persisted. + * Uses WeakRef for graph to allow garbage collection. + */ +export class InMemoryContentState implements IContentStateManager { + private state = new Map(); + + /** Cleanup interval for orphaned entries */ + private cleanupInterval: NodeJS.Timeout | null = null; + + constructor() { + // Cleanup orphaned content state every 5 minutes + this.cleanupInterval = setInterval(() => { + this.cleanupOrphaned(); + }, 300000); + + if (this.cleanupInterval.unref) { + this.cleanupInterval.unref(); + } + } + + setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { + const existing = this.state.get(streamId); + if (existing) { + existing.contentParts = contentParts; + } else { + this.state.set(streamId, { contentParts, graphRef: null }); + } + } + + getContentParts(streamId: string): Agents.MessageContentComplex[] | null { + return this.state.get(streamId)?.contentParts ?? null; + } + + setGraph(streamId: string, graph: StandardGraph): void { + const existing = this.state.get(streamId); + if (existing) { + existing.graphRef = new WeakRef(graph); + } else { + this.state.set(streamId, { + contentParts: [], + graphRef: new WeakRef(graph), + }); + } + } + + getRunSteps(streamId: string): Agents.RunStep[] { + const state = this.state.get(streamId); + if (!state?.graphRef) { + return []; + } + + // Dereference WeakRef - may return undefined if GC'd + const graph = state.graphRef.deref(); + return graph?.contentData ?? []; + } + + clearContentState(streamId: string): void { + this.state.delete(streamId); + } + + /** + * Cleanup entries where graph has been garbage collected. + * These are orphaned states that are no longer useful. + */ + private cleanupOrphaned(): void { + const toDelete: string[] = []; + + for (const [streamId, state] of this.state) { + // If graphRef exists but has been GC'd, this state is orphaned + if (state.graphRef && !state.graphRef.deref()) { + toDelete.push(streamId); + } + } + + for (const id of toDelete) { + this.state.delete(id); + } + } + + /** Get count of tracked streams (for monitoring) */ + getStateCount(): number { + return this.state.size; + } + + destroy(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + this.state.clear(); + } +} diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts new file mode 100644 index 0000000000..3a781fa4ba --- /dev/null +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -0,0 +1,121 @@ +import { EventEmitter } from 'events'; +import { logger } from '@librechat/data-schemas'; +import type { IEventTransport } from '../interfaces/IJobStore'; + +interface StreamState { + emitter: EventEmitter; + allSubscribersLeftCallback?: () => void; +} + +/** + * In-memory event transport using Node.js EventEmitter. + * For horizontal scaling, replace with RedisEventTransport. + */ +export class InMemoryEventTransport implements IEventTransport { + private streams = new Map(); + + private getOrCreateStream(streamId: string): StreamState { + let state = this.streams.get(streamId); + if (!state) { + const emitter = new EventEmitter(); + emitter.setMaxListeners(100); + state = { emitter }; + this.streams.set(streamId, state); + } + return state; + } + + subscribe( + streamId: string, + handlers: { + onChunk: (event: unknown) => void; + onDone?: (event: unknown) => void; + onError?: (error: string) => void; + }, + ): { unsubscribe: () => void } { + const state = this.getOrCreateStream(streamId); + + const chunkHandler = (event: unknown) => handlers.onChunk(event); + const doneHandler = (event: unknown) => handlers.onDone?.(event); + const errorHandler = (error: string) => handlers.onError?.(error); + + state.emitter.on('chunk', chunkHandler); + state.emitter.on('done', doneHandler); + state.emitter.on('error', errorHandler); + + return { + unsubscribe: () => { + const currentState = this.streams.get(streamId); + if (currentState) { + currentState.emitter.off('chunk', chunkHandler); + currentState.emitter.off('done', doneHandler); + currentState.emitter.off('error', errorHandler); + + // Check if all subscribers left + if (currentState.emitter.listenerCount('chunk') === 0) { + currentState.allSubscribersLeftCallback?.(); + } + } + }, + }; + } + + emitChunk(streamId: string, event: unknown): void { + const state = this.streams.get(streamId); + state?.emitter.emit('chunk', event); + } + + emitDone(streamId: string, event: unknown): void { + const state = this.streams.get(streamId); + state?.emitter.emit('done', event); + } + + emitError(streamId: string, error: string): void { + const state = this.streams.get(streamId); + state?.emitter.emit('error', error); + } + + getSubscriberCount(streamId: string): number { + const state = this.streams.get(streamId); + return state?.emitter.listenerCount('chunk') ?? 0; + } + + onAllSubscribersLeft(streamId: string, callback: () => void): void { + const state = this.getOrCreateStream(streamId); + state.allSubscribersLeftCallback = callback; + } + + /** + * Check if this is the first subscriber (for ready signaling) + */ + isFirstSubscriber(streamId: string): boolean { + const state = this.streams.get(streamId); + return state?.emitter.listenerCount('chunk') === 1; + } + + /** + * Cleanup a stream's event emitter + */ + cleanup(streamId: string): void { + const state = this.streams.get(streamId); + if (state) { + state.emitter.removeAllListeners(); + this.streams.delete(streamId); + } + } + + /** + * Get count of tracked streams (for monitoring) + */ + getStreamCount(): number { + return this.streams.size; + } + + destroy(): void { + for (const state of this.streams.values()) { + state.emitter.removeAllListeners(); + } + this.streams.clear(); + logger.debug('[InMemoryEventTransport] Destroyed'); + } +} diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts new file mode 100644 index 0000000000..308725e0db --- /dev/null +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -0,0 +1,219 @@ +import { logger } from '@librechat/data-schemas'; +import type { IJobStore, SerializableJobData, JobStatus } from '../interfaces/IJobStore'; + +/** + * In-memory implementation of IJobStore. + * Suitable for single-instance deployments. + * For horizontal scaling, use RedisJobStore. + */ +export class InMemoryJobStore implements IJobStore { + private jobs = new Map(); + private cleanupInterval: NodeJS.Timeout | null = null; + + /** Time to keep completed jobs before cleanup (5 minutes - reduced from 1 hour) */ + private ttlAfterComplete = 300000; + + /** Maximum number of concurrent jobs */ + private maxJobs = 1000; + + constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) { + if (options?.ttlAfterComplete) { + this.ttlAfterComplete = options.ttlAfterComplete; + } + if (options?.maxJobs) { + this.maxJobs = options.maxJobs; + } + } + + initialize(): void { + if (this.cleanupInterval) { + return; + } + + this.cleanupInterval = setInterval(() => { + this.cleanup(); + }, 60000); + + if (this.cleanupInterval.unref) { + this.cleanupInterval.unref(); + } + + logger.debug('[InMemoryJobStore] Initialized with cleanup interval'); + } + + async createJob( + streamId: string, + userId: string, + conversationId?: string, + ): Promise { + return this.createJobSync(streamId, userId, conversationId); + } + + /** Synchronous version for in-memory use */ + createJobSync(streamId: string, userId: string, conversationId?: string): SerializableJobData { + if (this.jobs.size >= this.maxJobs) { + this.evictOldestSync(); + } + + const job: SerializableJobData = { + streamId, + userId, + status: 'running', + createdAt: Date.now(), + conversationId, + syncSent: false, + }; + + this.jobs.set(streamId, job); + logger.debug(`[InMemoryJobStore] Created job: ${streamId}`); + + return job; + } + + async getJob(streamId: string): Promise { + return this.getJobSync(streamId); + } + + /** Synchronous version for in-memory use */ + getJobSync(streamId: string): SerializableJobData | null { + return this.jobs.get(streamId) ?? null; + } + + async getJobByConversation(conversationId: string): Promise { + return this.getJobByConversationSync(conversationId); + } + + /** Synchronous version for in-memory use */ + getJobByConversationSync(conversationId: string): SerializableJobData | null { + // Direct match first (streamId === conversationId for existing conversations) + const directMatch = this.jobs.get(conversationId); + if (directMatch && directMatch.status === 'running') { + return directMatch; + } + + // Search by conversationId in metadata + for (const job of this.jobs.values()) { + if (job.conversationId === conversationId && job.status === 'running') { + return job; + } + } + + return null; + } + + async updateJob(streamId: string, updates: Partial): Promise { + this.updateJobSync(streamId, updates); + } + + /** Synchronous version for in-memory use */ + updateJobSync(streamId: string, updates: Partial): void { + const job = this.jobs.get(streamId); + if (!job) { + return; + } + Object.assign(job, updates); + } + + async deleteJob(streamId: string): Promise { + this.deleteJobSync(streamId); + } + + /** Synchronous version for in-memory use */ + deleteJobSync(streamId: string): void { + this.jobs.delete(streamId); + logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); + } + + async hasJob(streamId: string): Promise { + return this.hasJobSync(streamId); + } + + /** Synchronous version for in-memory use */ + hasJobSync(streamId: string): boolean { + return this.jobs.has(streamId); + } + + async getRunningJobs(): Promise { + const running: SerializableJobData[] = []; + for (const job of this.jobs.values()) { + if (job.status === 'running') { + running.push(job); + } + } + return running; + } + + async cleanup(): Promise { + const now = Date.now(); + const toDelete: string[] = []; + + for (const [streamId, job] of this.jobs) { + const isFinished = ['complete', 'error', 'aborted'].includes(job.status); + if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) { + toDelete.push(streamId); + } + } + + for (const id of toDelete) { + await this.deleteJob(id); + } + + if (toDelete.length > 0) { + logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`); + } + + return toDelete.length; + } + + private async evictOldest(): Promise { + this.evictOldestSync(); + } + + /** Synchronous version for in-memory use */ + private evictOldestSync(): void { + let oldestId: string | null = null; + let oldestTime = Infinity; + + for (const [streamId, job] of this.jobs) { + if (job.createdAt < oldestTime) { + oldestTime = job.createdAt; + oldestId = streamId; + } + } + + if (oldestId) { + logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`); + this.deleteJobSync(oldestId); + } + } + + /** Get job count (for monitoring) */ + getJobCount(): number { + return this.jobs.size; + } + + /** Get job count by status (for monitoring) */ + getJobCountByStatus(): Record { + const counts: Record = { + running: 0, + complete: 0, + error: 0, + aborted: 0, + }; + + for (const job of this.jobs.values()) { + counts[job.status]++; + } + + return counts; + } + + destroy(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + this.jobs.clear(); + logger.debug('[InMemoryJobStore] Destroyed'); + } +} diff --git a/packages/api/src/stream/implementations/index.ts b/packages/api/src/stream/implementations/index.ts new file mode 100644 index 0000000000..4060943e69 --- /dev/null +++ b/packages/api/src/stream/implementations/index.ts @@ -0,0 +1,3 @@ +export * from './InMemoryJobStore'; +export * from './InMemoryContentState'; +export * from './InMemoryEventTransport'; diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts new file mode 100644 index 0000000000..6aa30659a8 --- /dev/null +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -0,0 +1,139 @@ +import type { Agents } from 'librechat-data-provider'; +import type { StandardGraph } from '@librechat/agents'; + +/** + * Job status enum + */ +export type JobStatus = 'running' | 'complete' | 'error' | 'aborted'; + +/** + * Serializable job data - no object references, suitable for Redis/external storage + */ +export interface SerializableJobData { + streamId: string; + userId: string; + status: JobStatus; + createdAt: number; + completedAt?: number; + conversationId?: string; + error?: string; + + /** User message metadata */ + userMessage?: { + messageId: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + }; + + /** Response message ID for reconnection */ + responseMessageId?: string; + + /** Sender name for UI display */ + sender?: string; + + /** Whether sync has been sent to a client */ + syncSent: boolean; + + /** Serialized final event for replay */ + finalEvent?: string; +} + +/** + * Resume state for reconnecting clients + */ +export interface ResumeState { + runSteps: Agents.RunStep[]; + aggregatedContent: Agents.MessageContentComplex[]; + userMessage?: SerializableJobData['userMessage']; + responseMessageId?: string; + conversationId?: string; + sender?: string; +} + +/** + * Interface for job storage backend. + * Implementations can use in-memory Map, Redis, KV store, etc. + */ +export interface IJobStore { + /** Create a new job */ + createJob( + streamId: string, + userId: string, + conversationId?: string, + ): Promise; + + /** Get a job by streamId */ + getJob(streamId: string): Promise; + + /** Find active job by conversationId */ + getJobByConversation(conversationId: string): Promise; + + /** Update job data */ + updateJob(streamId: string, updates: Partial): Promise; + + /** Delete a job */ + deleteJob(streamId: string): Promise; + + /** Check if job exists */ + hasJob(streamId: string): Promise; + + /** Get all running jobs (for cleanup) */ + getRunningJobs(): Promise; + + /** Cleanup expired jobs */ + cleanup(): Promise; +} + +/** + * Interface for pub/sub event transport. + * Implementations can use EventEmitter, Redis Pub/Sub, etc. + */ +export interface IEventTransport { + /** Subscribe to events for a stream */ + subscribe( + streamId: string, + handlers: { + onChunk: (event: unknown) => void; + onDone?: (event: unknown) => void; + onError?: (error: string) => void; + }, + ): { unsubscribe: () => void }; + + /** Publish a chunk event */ + emitChunk(streamId: string, event: unknown): void; + + /** Publish a done event */ + emitDone(streamId: string, event: unknown): void; + + /** Publish an error event */ + emitError(streamId: string, error: string): void; + + /** Get subscriber count for a stream */ + getSubscriberCount(streamId: string): number; + + /** Listen for all subscribers leaving */ + onAllSubscribersLeft(streamId: string, callback: () => void): void; +} + +/** + * Interface for content state management. + * Separates volatile content state from persistent job data. + * In-memory only - not persisted to external storage. + */ +export interface IContentStateManager { + /** Set content parts reference (in-memory only) */ + setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void; + + /** Get content parts */ + getContentParts(streamId: string): Agents.MessageContentComplex[] | null; + + /** Set graph reference for run steps */ + setGraph(streamId: string, graph: StandardGraph): void; + + /** Get run steps from graph */ + getRunSteps(streamId: string): Agents.RunStep[]; + + /** Clear content state for a job */ + clearContentState(streamId: string): void; +} diff --git a/packages/api/src/stream/interfaces/index.ts b/packages/api/src/stream/interfaces/index.ts new file mode 100644 index 0000000000..5e31fb6fa3 --- /dev/null +++ b/packages/api/src/stream/interfaces/index.ts @@ -0,0 +1 @@ +export * from './IJobStore'; diff --git a/packages/api/src/types/stream.ts b/packages/api/src/types/stream.ts index 592ec40081..d4df950210 100644 --- a/packages/api/src/types/stream.ts +++ b/packages/api/src/types/stream.ts @@ -1,6 +1,5 @@ import type { EventEmitter } from 'events'; import type { Agents } from 'librechat-data-provider'; -import type { StandardGraph } from '@librechat/agents'; import type { ServerSentEvent } from '~/types'; export interface GenerationJobMetadata { @@ -27,14 +26,8 @@ export interface GenerationJob { metadata: GenerationJobMetadata; readyPromise: Promise; resolveReady: () => void; - /** Buffered chunks for replay on reconnect */ - chunks: ServerSentEvent[]; /** Final event when job completes */ finalEvent?: ServerSentEvent; - /** Reference to graph's contentParts - the authoritative content source */ - contentPartsRef?: Agents.MessageContentComplex[]; - /** Reference to the graph instance for accessing run steps (contentData) */ - graphRef?: StandardGraph; /** Flag to indicate if a sync event was already sent (prevent duplicate replays) */ syncSent?: boolean; } diff --git a/packages/api/tsconfig.json b/packages/api/tsconfig.json index ccdf3ebb2e..55e7e90567 100644 --- a/packages/api/tsconfig.json +++ b/packages/api/tsconfig.json @@ -8,7 +8,7 @@ "target": "es2015", "moduleResolution": "node", "allowSyntheticDefaultImports": true, - "lib": ["es2017", "dom", "ES2021.String"], + "lib": ["es2017", "dom", "ES2021.String", "ES2021.WeakRef"], "allowJs": true, "skipLibCheck": true, "esModuleInterop": true, diff --git a/packages/data-provider/src/types/agents.ts b/packages/data-provider/src/types/agents.ts index 43ba6cfeb1..4842b76d74 100644 --- a/packages/data-provider/src/types/agents.ts +++ b/packages/data-provider/src/types/agents.ts @@ -195,6 +195,7 @@ export namespace Agents { userMessage?: UserMessageMeta; responseMessageId?: string; conversationId?: string; + sender?: string; } /** * Represents a run step delta i.e. any changed fields on a run step during