refactor: Enhance GenerationJobManager with In-Memory Implementations

- Introduced InMemoryJobStore, InMemoryEventTransport, and InMemoryContentState for improved job management and event handling.
- Updated GenerationJobManager to utilize these new implementations, allowing for better separation of concerns and easier maintenance.
- Enhanced job metadata handling to support user messages and response IDs for resumable functionality.
- Improved cleanup and state management processes to prevent memory leaks and ensure efficient resource usage.
This commit is contained in:
Danny Avila 2025-12-12 02:16:24 -05:00
parent 5ff66f2d77
commit ff86f96416
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
14 changed files with 892 additions and 321 deletions

View file

@ -177,10 +177,16 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
const onStart = (userMsg, respMsgId, _isNewConvo) => {
userMessage = userMsg;
// Store the response messageId upfront so partial saves use the same ID
if (respMsgId) {
GenerationJobManager.updateMetadata(streamId, { responseMessageId: respMsgId });
}
// Store userMessage and responseMessageId upfront for resume capability
GenerationJobManager.updateMetadata(streamId, {
responseMessageId: respMsgId,
userMessage: {
messageId: userMsg.messageId,
parentMessageId: userMsg.parentMessageId,
conversationId: userMsg.conversationId,
text: userMsg.text,
},
});
GenerationJobManager.emitChunk(streamId, {
created: true,

View file

@ -118,7 +118,7 @@ router.get('/chat/stream/:streamId', (req, res) => {
* @route GET /chat/status/:conversationId
* @desc Check if there's an active generation job for a conversation
* @access Private
* @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt, resumeState }
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
*/
router.get('/chat/status/:conversationId', (req, res) => {
const { conversationId } = req.params;
@ -140,8 +140,6 @@ router.get('/chat/status/:conversationId', (req, res) => {
active: info?.active ?? false,
streamId: job.streamId,
status: info?.status ?? job.status,
chunkCount: info?.chunkCount ?? 0,
runStepCount: info?.runStepCount ?? 0,
aggregatedContent: info?.aggregatedContent,
createdAt: info?.createdAt ?? job.createdAt,
resumeState,

View file

@ -6,7 +6,6 @@ export interface StreamStatusResponse {
active: boolean;
streamId?: string;
status?: 'running' | 'complete' | 'error' | 'aborted';
chunkCount?: number;
aggregatedContent?: Array<{ type: string; text?: string }>;
createdAt?: number;
resumeState?: Agents.ResumeState;

View file

@ -62,7 +62,7 @@ function buildSubmissionFromResumeState(
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
isCreatedByUser: false,
role: 'assistant',
sender: existingResponseMessage?.sender,
sender: existingResponseMessage?.sender ?? resumeState.sender,
model: existingResponseMessage?.model,
} as TMessage;

View file

@ -1,21 +1,46 @@
import { EventEmitter } from 'events';
import { logger } from '@librechat/data-schemas';
import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
import type { SerializableJobData } from './interfaces/IJobStore';
import type * as t from '~/types';
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
import { InMemoryContentState } from './implementations/InMemoryContentState';
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
/**
* Runtime state for active jobs - not serializable, kept in-memory per instance.
* Contains AbortController, ready promise, and other non-serializable state.
*/
interface RuntimeJobState {
abortController: AbortController;
readyPromise: Promise<void>;
resolveReady: () => void;
finalEvent?: t.ServerSentEvent;
syncSent: boolean;
}
/**
* Manages generation jobs for resumable LLM streams.
* Generation runs independently of HTTP connections via EventEmitter.
* Clients can subscribe/unsubscribe to job events without affecting generation.
* Composes three implementations for clean separation of concerns:
* - InMemoryJobStore: Serializable job metadata (swappable for Redis)
* - InMemoryEventTransport: Pub/sub events (swappable for Redis Pub/Sub)
* - InMemoryContentState: Volatile content refs with WeakRef (always in-memory)
*/
class GenerationJobManagerClass {
private jobs = new Map<string, t.GenerationJob>();
private jobStore: InMemoryJobStore;
private eventTransport: InMemoryEventTransport;
private contentState: InMemoryContentState;
/** Runtime state - always in-memory, not serializable */
private runtimeState = new Map<string, RuntimeJobState>();
private cleanupInterval: NodeJS.Timeout | null = null;
/** Time to keep completed jobs before cleanup (1 hour) */
private ttlAfterComplete = 3600000;
/** Maximum number of concurrent jobs */
private maxJobs = 1000;
constructor() {
this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 });
this.eventTransport = new InMemoryEventTransport();
this.contentState = new InMemoryContentState();
}
/**
* Initialize the job manager with periodic cleanup.
@ -25,6 +50,8 @@ class GenerationJobManagerClass {
return;
}
this.jobStore.initialize();
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, 60000);
@ -33,185 +60,231 @@ class GenerationJobManagerClass {
this.cleanupInterval.unref();
}
logger.debug('[GenerationJobManager] Initialized with cleanup interval');
logger.debug('[GenerationJobManager] Initialized');
}
/**
* Create a new generation job.
* @param streamId - Unique identifier for the stream
* @param userId - User ID who initiated the generation
* @param conversationId - Optional conversation ID
* @returns The created job
* @returns A facade object compatible with the old GenerationJob interface
*/
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
if (this.jobs.size >= this.maxJobs) {
this.evictOldest();
}
// Create serializable job data (sync for in-memory)
const jobData = this.jobStore.createJobSync(streamId, userId, conversationId);
// Create runtime state
let resolveReady: () => void;
const readyPromise = new Promise<void>((resolve) => {
resolveReady = resolve;
});
const job: t.GenerationJob = {
streamId,
emitter: new EventEmitter(),
status: 'running',
createdAt: Date.now(),
const runtime: RuntimeJobState = {
abortController: new AbortController(),
metadata: { userId, conversationId },
readyPromise,
resolveReady: resolveReady!,
chunks: [],
syncSent: false,
};
this.runtimeState.set(streamId, runtime);
job.emitter.setMaxListeners(100);
// Set up all-subscribers-left callback
this.eventTransport.onAllSubscribersLeft(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime) {
currentRuntime.syncSent = false;
}
const content = this.contentState.getContentParts(streamId) ?? [];
this.eventTransport.emitChunk(streamId, {
_internal: 'allSubscribersLeft',
content,
});
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
});
this.jobs.set(streamId, job);
logger.debug(`[GenerationJobManager] Created job: ${streamId}`);
return job;
// Return facade for backwards compatibility
return this.buildJobFacade(streamId, jobData, runtime);
}
/**
* Build a GenerationJob facade from job data and runtime state.
* This maintains backwards compatibility with existing code.
*/
private buildJobFacade(
streamId: string,
jobData: SerializableJobData,
runtime: RuntimeJobState,
): t.GenerationJob {
// Create a proxy emitter that delegates to eventTransport
const emitterProxy = {
on: (event: string, handler: (...args: unknown[]) => void) => {
if (event === 'allSubscribersLeft') {
// Subscribe to internal event
this.eventTransport.subscribe(streamId, {
onChunk: (e) => {
const evt = e as Record<string, unknown>;
if (evt._internal === 'allSubscribersLeft') {
handler(evt.content);
}
},
});
}
},
emit: () => {
/* handled via eventTransport */
},
listenerCount: () => this.eventTransport.getSubscriberCount(streamId),
setMaxListeners: () => {
/* no-op for proxy */
},
removeAllListeners: () => this.eventTransport.cleanup(streamId),
off: () => {
/* handled via unsubscribe */
},
};
return {
streamId,
emitter: emitterProxy as unknown as t.GenerationJob['emitter'],
status: jobData.status as t.GenerationJobStatus,
createdAt: jobData.createdAt,
completedAt: jobData.completedAt,
abortController: runtime.abortController,
error: jobData.error,
metadata: {
userId: jobData.userId,
conversationId: jobData.conversationId,
userMessage: jobData.userMessage,
responseMessageId: jobData.responseMessageId,
sender: jobData.sender,
},
readyPromise: runtime.readyPromise,
resolveReady: runtime.resolveReady,
finalEvent: runtime.finalEvent,
syncSent: runtime.syncSent,
};
}
/**
* Get a job by streamId.
* @param streamId - The stream identifier
* @returns The job if found, undefined otherwise
*/
getJob(streamId: string): t.GenerationJob | undefined {
return this.jobs.get(streamId);
const jobData = this.jobStore.getJobSync(streamId);
const runtime = this.runtimeState.get(streamId);
if (!jobData || !runtime) {
return undefined;
}
return this.buildJobFacade(streamId, jobData, runtime);
}
/**
* Find an active job by conversationId.
* Since streamId === conversationId for existing conversations,
* we first check by streamId, then search metadata.
* @param conversationId - The conversation identifier
* @returns The job if found, undefined otherwise
*/
getJobByConversation(conversationId: string): t.GenerationJob | undefined {
const directMatch = this.jobs.get(conversationId);
if (directMatch && directMatch.status === 'running') {
return directMatch;
const jobData = this.jobStore.getJobByConversationSync(conversationId);
if (!jobData) {
return undefined;
}
for (const job of this.jobs.values()) {
if (job.metadata.conversationId === conversationId && job.status === 'running') {
return job;
}
const runtime = this.runtimeState.get(jobData.streamId);
if (!runtime) {
return undefined;
}
return undefined;
return this.buildJobFacade(jobData.streamId, jobData, runtime);
}
/**
* Check if a job exists.
* @param streamId - The stream identifier
* @returns True if job exists
*/
hasJob(streamId: string): boolean {
return this.jobs.has(streamId);
return this.jobStore.hasJobSync(streamId);
}
/**
* Get job status.
* @param streamId - The stream identifier
* @returns The job status or undefined if not found
*/
getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
return this.jobs.get(streamId)?.status;
const jobData = this.jobStore.getJobSync(streamId);
return jobData?.status as t.GenerationJobStatus | undefined;
}
/**
* Mark job as complete.
* @param streamId - The stream identifier
* @param error - Optional error message if job failed
*/
completeJob(streamId: string, error?: string): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
async completeJob(streamId: string, error?: string): Promise<void> {
await this.jobStore.updateJob(streamId, {
status: error ? 'error' : 'complete',
completedAt: Date.now(),
error,
});
job.status = error ? 'error' : 'complete';
job.completedAt = Date.now();
if (error) {
job.error = error;
}
// Clear content state
this.contentState.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job completed: ${streamId}, status: ${job.status}`);
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
}
/**
* Abort a job (user-initiated).
* Emits both error event and a final done event with aborted flag.
* @param streamId - The stream identifier
*/
abortJob(streamId: string): void {
const job = this.jobs.get(streamId);
if (!job) {
async abortJob(streamId: string): Promise<void> {
const jobData = this.jobStore.getJobSync(streamId);
const runtime = this.runtimeState.get(streamId);
if (!jobData) {
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
return;
}
logger.debug(
`[GenerationJobManager] Aborting job ${streamId}, signal already aborted: ${job.abortController.signal.aborted}`,
);
job.abortController.abort();
job.status = 'aborted';
job.completedAt = Date.now();
logger.debug(
`[GenerationJobManager] AbortController.abort() called for ${streamId}, signal.aborted: ${job.abortController.signal.aborted}`,
);
if (runtime) {
runtime.abortController.abort();
}
// Create a final event for abort so clients can properly handle UI cleanup
const userMessageId = job.metadata.userMessage?.messageId;
const abortFinalEvent = {
await this.jobStore.updateJob(streamId, {
status: 'aborted',
completedAt: Date.now(),
});
// Create final event for abort
const userMessageId = jobData.userMessage?.messageId;
const content = this.contentState.getContentParts(streamId) ?? [];
const abortFinalEvent: t.ServerSentEvent = {
final: true,
conversation: {
conversationId: job.metadata.conversationId,
},
conversation: { conversationId: jobData.conversationId },
title: 'New Chat',
requestMessage: job.metadata.userMessage
requestMessage: jobData.userMessage
? {
messageId: userMessageId,
parentMessageId: job.metadata.userMessage.parentMessageId,
conversationId: job.metadata.conversationId,
text: job.metadata.userMessage.text ?? '',
parentMessageId: jobData.userMessage.parentMessageId,
conversationId: jobData.conversationId,
text: jobData.userMessage.text ?? '',
isCreatedByUser: true,
}
: null,
responseMessage: {
messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
parentMessageId: userMessageId, // Link response to user message
conversationId: job.metadata.conversationId,
content: job.contentPartsRef ?? [],
sender: job.metadata.sender ?? 'AI',
messageId: jobData.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
parentMessageId: userMessageId,
conversationId: jobData.conversationId,
content,
sender: jobData.sender ?? 'AI',
unfinished: true,
/** Not an error - the job was intentionally aborted */
error: false,
isCreatedByUser: false,
},
aborted: true,
} as unknown as t.ServerSentEvent;
job.finalEvent = abortFinalEvent;
job.emitter.emit('done', abortFinalEvent);
// Don't emit error event - it causes unhandled error warnings
// The done event with error:true and aborted:true is sufficient
if (runtime) {
runtime.finalEvent = abortFinalEvent;
}
this.eventTransport.emitDone(streamId, abortFinalEvent);
this.contentState.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
}
/**
* Subscribe to a job's event stream with replay support.
* Replays any chunks buffered during disconnect, then continues with live events.
* Buffer is cleared after replay (only holds chunks missed during disconnect).
* @param streamId - The stream identifier
* @param onChunk - Handler for chunk events
* @param onDone - Optional handler for completion
* @param onError - Optional handler for errors
* @returns Object with unsubscribe function, or null if job not found
* Subscribe to a job's event stream.
*/
subscribe(
streamId: string,
@ -219,352 +292,263 @@ class GenerationJobManagerClass {
onDone?: t.DoneHandler,
onError?: t.ErrorHandler,
): { unsubscribe: t.UnsubscribeFn } | null {
const job = this.jobs.get(streamId);
if (!job) {
const runtime = this.runtimeState.get(streamId);
if (!runtime) {
return null;
}
// Use setImmediate to allow the caller to set up their connection first
const jobData = this.jobStore.getJobSync(streamId);
// If job already complete, send final event
setImmediate(() => {
// If job is already complete, send the final event
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
onDone?.(job.finalEvent);
if (
runtime.finalEvent &&
jobData &&
['complete', 'error', 'aborted'].includes(jobData.status)
) {
onDone?.(runtime.finalEvent);
}
});
const chunkHandler = (event: t.ServerSentEvent) => onChunk(event);
const doneHandler = (event: t.ServerSentEvent) => onDone?.(event);
const errorHandler = (error: string) => onError?.(error);
const subscription = this.eventTransport.subscribe(streamId, {
onChunk: (event) => {
const e = event as t.ServerSentEvent;
// Filter out internal events
if (!(e as Record<string, unknown>)._internal) {
onChunk(e);
}
},
onDone: (event) => onDone?.(event as t.ServerSentEvent),
onError,
});
job.emitter.on('chunk', chunkHandler);
job.emitter.on('done', doneHandler);
job.emitter.on('error', errorHandler);
// Signal that we're ready to receive events (first subscriber)
if (job.emitter.listenerCount('chunk') === 1) {
job.resolveReady();
// Signal ready on first subscriber
if (this.eventTransport.isFirstSubscriber(streamId)) {
runtime.resolveReady();
logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`);
}
const unsubscribe = () => {
const currentJob = this.jobs.get(streamId);
if (currentJob) {
currentJob.emitter.off('chunk', chunkHandler);
currentJob.emitter.off('done', doneHandler);
currentJob.emitter.off('error', errorHandler);
// When last subscriber leaves
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
// Reset syncSent so reconnecting clients get sync event again
currentJob.syncSent = false;
// Emit event for saving partial response - use graph's contentParts directly
currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []);
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
}
}
};
return { unsubscribe };
return subscription;
}
/**
* Emit a chunk event to all subscribers.
* Only buffers chunks when no subscribers are listening (for reconnect replay).
* Also tracks run steps and user message for reconnection state.
* @param streamId - The stream identifier
* @param event - The event data to emit
*/
emitChunk(streamId: string, event: t.ServerSentEvent): void {
const job = this.jobs.get(streamId);
if (!job || job.status !== 'running') {
const jobData = this.jobStore.getJobSync(streamId);
if (!jobData || jobData.status !== 'running') {
return;
}
// // Only buffer if no one is listening (for reconnect replay)
// const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
// if (!hasSubscribers) {
// job.chunks.push(event);
// }
// Track user message from created event
this.trackUserMessage(job, event);
this.trackUserMessage(streamId, event);
// Run steps and content are tracked via graphRef and contentPartsRef
// No need to aggregate separately - these reference the graph's data directly
job.emitter.emit('chunk', event);
this.eventTransport.emitChunk(streamId, event);
}
/**
* Track user message from created event for reconnection.
* Track user message from created event.
*/
private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void {
private trackUserMessage(streamId: string, event: t.ServerSentEvent): void {
const data = event as Record<string, unknown>;
if (!data.created || !data.message) {
return;
}
const message = data.message as Record<string, unknown>;
job.metadata.userMessage = {
messageId: message.messageId as string,
parentMessageId: message.parentMessageId as string | undefined,
conversationId: message.conversationId as string | undefined,
text: message.text as string | undefined,
const updates: Partial<SerializableJobData> = {
userMessage: {
messageId: message.messageId as string,
parentMessageId: message.parentMessageId as string | undefined,
conversationId: message.conversationId as string | undefined,
text: message.text as string | undefined,
},
};
// Update conversationId in metadata if not set
if (!job.metadata.conversationId && message.conversationId) {
job.metadata.conversationId = message.conversationId as string;
if (message.conversationId) {
updates.conversationId = message.conversationId as string;
}
logger.debug(`[GenerationJobManager] Tracked user message for ${job.streamId}`);
this.jobStore.updateJob(streamId, updates);
logger.debug(`[GenerationJobManager] Tracked user message for ${streamId}`);
}
/**
* Update job metadata with additional information.
* Called when more information becomes available during generation.
* @param streamId - The stream identifier
* @param metadata - Partial metadata to merge
* Update job metadata.
*/
updateMetadata(streamId: string, metadata: Partial<t.GenerationJobMetadata>): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
const updates: Partial<SerializableJobData> = {};
if (metadata.responseMessageId) {
updates.responseMessageId = metadata.responseMessageId;
}
job.metadata = { ...job.metadata, ...metadata };
if (metadata.sender) {
updates.sender = metadata.sender;
}
if (metadata.conversationId) {
updates.conversationId = metadata.conversationId;
}
if (metadata.userMessage) {
updates.userMessage = metadata.userMessage;
}
this.jobStore.updateJob(streamId, updates);
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
}
/**
* Set reference to the graph's contentParts array.
* This is the authoritative content source - no need to aggregate separately.
* @param streamId - The stream identifier
* @param contentParts - Reference to graph's contentParts array
*/
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
const job = this.jobs.get(streamId);
if (!job) {
if (!this.jobStore.hasJobSync(streamId)) {
return;
}
job.contentPartsRef = contentParts;
logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, {
initialLength: contentParts?.length ?? 0,
isArray: Array.isArray(contentParts),
});
this.contentState.setContentParts(streamId, contentParts);
logger.debug(`[GenerationJobManager] Set contentParts for ${streamId}`);
}
/**
* Set reference to the graph instance.
* This provides access to run steps (contentData) - no need to track separately.
* @param streamId - The stream identifier
* @param graph - Reference to the graph instance (must have contentData property)
*/
setGraph(streamId: string, graph: StandardGraph): void {
const job = this.jobs.get(streamId);
if (!job) {
if (!this.jobStore.hasJobSync(streamId)) {
return;
}
job.graphRef = graph;
this.contentState.setGraph(streamId, graph);
logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`);
}
/**
* Get resume state for reconnecting clients.
* Includes run steps, aggregated content, and user message data.
* @param streamId - The stream identifier
* @returns Resume state or null if job not found
*/
getResumeState(streamId: string): t.ResumeState | null {
const job = this.jobs.get(streamId);
if (!job) {
const jobData = this.jobStore.getJobSync(streamId);
if (!jobData) {
return null;
}
// Use graph's contentParts directly - it's always current and complete
// No conversion needed - send as-is
const aggregatedContent = job.contentPartsRef ?? [];
// Use graph's contentData for run steps - it's the authoritative source
const runSteps = job.graphRef?.contentData ?? [];
const aggregatedContent = this.contentState.getContentParts(streamId) ?? [];
const runSteps = this.contentState.getRunSteps(streamId);
logger.debug(`[GenerationJobManager] getResumeState:`, {
streamId,
aggregatedContentLength: aggregatedContent.length,
runStepsLength: runSteps.length,
hasGraphRef: !!job.graphRef,
hasContentPartsRef: !!job.contentPartsRef,
});
return {
runSteps,
aggregatedContent,
userMessage: job.metadata.userMessage,
responseMessageId: job.metadata.responseMessageId,
conversationId: job.metadata.conversationId,
userMessage: jobData.userMessage,
responseMessageId: jobData.responseMessageId,
conversationId: jobData.conversationId,
sender: jobData.sender,
};
}
/**
* Mark that sync has been sent for this job to prevent duplicate replays.
* @param streamId - The stream identifier
* Mark that sync has been sent.
*/
markSyncSent(streamId: string): void {
const job = this.jobs.get(streamId);
if (job) {
job.syncSent = true;
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.syncSent = true;
}
}
/**
* Check if sync has been sent for this job.
* @param streamId - The stream identifier
* Check if sync has been sent.
*/
wasSyncSent(streamId: string): boolean {
return this.jobs.get(streamId)?.syncSent ?? false;
return this.runtimeState.get(streamId)?.syncSent ?? false;
}
/**
* Emit a done event to all subscribers.
* Stores the final event for replay on reconnect.
* @param streamId - The stream identifier
* @param event - The final event data
* Emit a done event.
*/
emitDone(streamId: string, event: t.ServerSentEvent): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.finalEvent = event;
}
job.finalEvent = event;
job.emitter.emit('done', event);
this.eventTransport.emitDone(streamId, event);
}
/**
* Emit an error event to all subscribers.
* @param streamId - The stream identifier
* @param error - The error message
* Emit an error event.
*/
emitError(streamId: string, error: string): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
job.emitter.emit('error', error);
this.eventTransport.emitError(streamId, error);
}
/**
* Cleanup completed jobs after TTL.
* Cleanup expired jobs.
*/
private cleanup(): void {
const now = Date.now();
const toDelete: string[] = [];
private async cleanup(): Promise<void> {
const count = await this.jobStore.cleanup();
for (const [streamId, job] of this.jobs) {
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) {
toDelete.push(streamId);
// Cleanup runtime state for deleted jobs
for (const streamId of this.runtimeState.keys()) {
if (!this.jobStore.hasJobSync(streamId)) {
this.runtimeState.delete(streamId);
this.contentState.clearContentState(streamId);
this.eventTransport.cleanup(streamId);
}
}
toDelete.forEach((id) => this.deleteJob(id));
if (toDelete.length > 0) {
logger.debug(`[GenerationJobManager] Cleaned up ${toDelete.length} expired jobs`);
}
}
/**
* Delete a job and cleanup listeners.
* @param streamId - The stream identifier
*/
private deleteJob(streamId: string): void {
const job = this.jobs.get(streamId);
if (job) {
job.emitter.removeAllListeners();
this.jobs.delete(streamId);
}
}
/**
* Evict oldest job (LRU).
*/
private evictOldest(): void {
let oldestId: string | null = null;
let oldestTime = Infinity;
for (const [streamId, job] of this.jobs) {
if (job.createdAt < oldestTime) {
oldestTime = job.createdAt;
oldestId = streamId;
}
}
if (oldestId) {
logger.warn(`[GenerationJobManager] Evicting oldest job: ${oldestId}`);
this.deleteJob(oldestId);
if (count > 0) {
logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`);
}
}
/**
* Get stream info for status endpoint.
* Returns chunk count, status, aggregated content, and run step count.
*/
getStreamInfo(streamId: string): {
active: boolean;
status: t.GenerationJobStatus;
chunkCount: number;
runStepCount: number;
aggregatedContent?: Agents.MessageContentComplex[];
createdAt: number;
} | null {
const job = this.jobs.get(streamId);
if (!job) {
const jobData = this.jobStore.getJobSync(streamId);
if (!jobData) {
return null;
}
return {
active: job.status === 'running',
status: job.status,
chunkCount: job.chunks.length,
runStepCount: job.graphRef?.contentData?.length ?? 0,
aggregatedContent: job.contentPartsRef ?? [],
createdAt: job.createdAt,
active: jobData.status === 'running',
status: jobData.status as t.GenerationJobStatus,
aggregatedContent: this.contentState.getContentParts(streamId) ?? [],
createdAt: jobData.createdAt,
};
}
/**
* Get total number of active jobs.
* Get total job count.
*/
getJobCount(): number {
return this.jobs.size;
return this.jobStore.getJobCount();
}
/**
* Get count of jobs by status.
* Get job count by status.
*/
getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
const counts: Record<t.GenerationJobStatus, number> = {
running: 0,
complete: 0,
error: 0,
aborted: 0,
};
for (const job of this.jobs.values()) {
counts[job.status]++;
}
return counts;
return this.jobStore.getJobCountByStatus() as Record<t.GenerationJobStatus, number>;
}
/**
* Destroy the manager and cleanup all jobs.
* Destroy the manager.
*/
destroy(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.jobs.forEach((_, streamId) => this.deleteJob(streamId));
this.jobStore.destroy();
this.eventTransport.destroy();
this.contentState.destroy();
this.runtimeState.clear();
logger.debug('[GenerationJobManager] Destroyed');
}
}

View file

@ -0,0 +1,107 @@
import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
import type { IContentStateManager } from '../interfaces/IJobStore';
/**
* Content state entry - volatile, in-memory only.
* Uses WeakRef to allow garbage collection of graph when no longer needed.
*/
interface ContentState {
contentParts: Agents.MessageContentComplex[];
graphRef: WeakRef<StandardGraph> | null;
}
/**
* In-memory content state manager.
* Manages volatile references to graph content that should NOT be persisted.
* Uses WeakRef for graph to allow garbage collection.
*/
export class InMemoryContentState implements IContentStateManager {
private state = new Map<string, ContentState>();
/** Cleanup interval for orphaned entries */
private cleanupInterval: NodeJS.Timeout | null = null;
constructor() {
// Cleanup orphaned content state every 5 minutes
this.cleanupInterval = setInterval(() => {
this.cleanupOrphaned();
}, 300000);
if (this.cleanupInterval.unref) {
this.cleanupInterval.unref();
}
}
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
const existing = this.state.get(streamId);
if (existing) {
existing.contentParts = contentParts;
} else {
this.state.set(streamId, { contentParts, graphRef: null });
}
}
getContentParts(streamId: string): Agents.MessageContentComplex[] | null {
return this.state.get(streamId)?.contentParts ?? null;
}
setGraph(streamId: string, graph: StandardGraph): void {
const existing = this.state.get(streamId);
if (existing) {
existing.graphRef = new WeakRef(graph);
} else {
this.state.set(streamId, {
contentParts: [],
graphRef: new WeakRef(graph),
});
}
}
getRunSteps(streamId: string): Agents.RunStep[] {
const state = this.state.get(streamId);
if (!state?.graphRef) {
return [];
}
// Dereference WeakRef - may return undefined if GC'd
const graph = state.graphRef.deref();
return graph?.contentData ?? [];
}
clearContentState(streamId: string): void {
this.state.delete(streamId);
}
/**
* Cleanup entries where graph has been garbage collected.
* These are orphaned states that are no longer useful.
*/
private cleanupOrphaned(): void {
const toDelete: string[] = [];
for (const [streamId, state] of this.state) {
// If graphRef exists but has been GC'd, this state is orphaned
if (state.graphRef && !state.graphRef.deref()) {
toDelete.push(streamId);
}
}
for (const id of toDelete) {
this.state.delete(id);
}
}
/** Get count of tracked streams (for monitoring) */
getStateCount(): number {
return this.state.size;
}
destroy(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.state.clear();
}
}

View file

@ -0,0 +1,121 @@
import { EventEmitter } from 'events';
import { logger } from '@librechat/data-schemas';
import type { IEventTransport } from '../interfaces/IJobStore';
interface StreamState {
emitter: EventEmitter;
allSubscribersLeftCallback?: () => void;
}
/**
* In-memory event transport using Node.js EventEmitter.
* For horizontal scaling, replace with RedisEventTransport.
*/
export class InMemoryEventTransport implements IEventTransport {
private streams = new Map<string, StreamState>();
private getOrCreateStream(streamId: string): StreamState {
let state = this.streams.get(streamId);
if (!state) {
const emitter = new EventEmitter();
emitter.setMaxListeners(100);
state = { emitter };
this.streams.set(streamId, state);
}
return state;
}
subscribe(
streamId: string,
handlers: {
onChunk: (event: unknown) => void;
onDone?: (event: unknown) => void;
onError?: (error: string) => void;
},
): { unsubscribe: () => void } {
const state = this.getOrCreateStream(streamId);
const chunkHandler = (event: unknown) => handlers.onChunk(event);
const doneHandler = (event: unknown) => handlers.onDone?.(event);
const errorHandler = (error: string) => handlers.onError?.(error);
state.emitter.on('chunk', chunkHandler);
state.emitter.on('done', doneHandler);
state.emitter.on('error', errorHandler);
return {
unsubscribe: () => {
const currentState = this.streams.get(streamId);
if (currentState) {
currentState.emitter.off('chunk', chunkHandler);
currentState.emitter.off('done', doneHandler);
currentState.emitter.off('error', errorHandler);
// Check if all subscribers left
if (currentState.emitter.listenerCount('chunk') === 0) {
currentState.allSubscribersLeftCallback?.();
}
}
},
};
}
emitChunk(streamId: string, event: unknown): void {
const state = this.streams.get(streamId);
state?.emitter.emit('chunk', event);
}
emitDone(streamId: string, event: unknown): void {
const state = this.streams.get(streamId);
state?.emitter.emit('done', event);
}
emitError(streamId: string, error: string): void {
const state = this.streams.get(streamId);
state?.emitter.emit('error', error);
}
getSubscriberCount(streamId: string): number {
const state = this.streams.get(streamId);
return state?.emitter.listenerCount('chunk') ?? 0;
}
onAllSubscribersLeft(streamId: string, callback: () => void): void {
const state = this.getOrCreateStream(streamId);
state.allSubscribersLeftCallback = callback;
}
/**
* Check if this is the first subscriber (for ready signaling)
*/
isFirstSubscriber(streamId: string): boolean {
const state = this.streams.get(streamId);
return state?.emitter.listenerCount('chunk') === 1;
}
/**
* Cleanup a stream's event emitter
*/
cleanup(streamId: string): void {
const state = this.streams.get(streamId);
if (state) {
state.emitter.removeAllListeners();
this.streams.delete(streamId);
}
}
/**
* Get count of tracked streams (for monitoring)
*/
getStreamCount(): number {
return this.streams.size;
}
destroy(): void {
for (const state of this.streams.values()) {
state.emitter.removeAllListeners();
}
this.streams.clear();
logger.debug('[InMemoryEventTransport] Destroyed');
}
}

View file

@ -0,0 +1,219 @@
import { logger } from '@librechat/data-schemas';
import type { IJobStore, SerializableJobData, JobStatus } from '../interfaces/IJobStore';
/**
* In-memory implementation of IJobStore.
* Suitable for single-instance deployments.
* For horizontal scaling, use RedisJobStore.
*/
export class InMemoryJobStore implements IJobStore {
private jobs = new Map<string, SerializableJobData>();
private cleanupInterval: NodeJS.Timeout | null = null;
/** Time to keep completed jobs before cleanup (5 minutes - reduced from 1 hour) */
private ttlAfterComplete = 300000;
/** Maximum number of concurrent jobs */
private maxJobs = 1000;
constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) {
if (options?.ttlAfterComplete) {
this.ttlAfterComplete = options.ttlAfterComplete;
}
if (options?.maxJobs) {
this.maxJobs = options.maxJobs;
}
}
initialize(): void {
if (this.cleanupInterval) {
return;
}
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, 60000);
if (this.cleanupInterval.unref) {
this.cleanupInterval.unref();
}
logger.debug('[InMemoryJobStore] Initialized with cleanup interval');
}
async createJob(
streamId: string,
userId: string,
conversationId?: string,
): Promise<SerializableJobData> {
return this.createJobSync(streamId, userId, conversationId);
}
/** Synchronous version for in-memory use */
createJobSync(streamId: string, userId: string, conversationId?: string): SerializableJobData {
if (this.jobs.size >= this.maxJobs) {
this.evictOldestSync();
}
const job: SerializableJobData = {
streamId,
userId,
status: 'running',
createdAt: Date.now(),
conversationId,
syncSent: false,
};
this.jobs.set(streamId, job);
logger.debug(`[InMemoryJobStore] Created job: ${streamId}`);
return job;
}
async getJob(streamId: string): Promise<SerializableJobData | null> {
return this.getJobSync(streamId);
}
/** Synchronous version for in-memory use */
getJobSync(streamId: string): SerializableJobData | null {
return this.jobs.get(streamId) ?? null;
}
async getJobByConversation(conversationId: string): Promise<SerializableJobData | null> {
return this.getJobByConversationSync(conversationId);
}
/** Synchronous version for in-memory use */
getJobByConversationSync(conversationId: string): SerializableJobData | null {
// Direct match first (streamId === conversationId for existing conversations)
const directMatch = this.jobs.get(conversationId);
if (directMatch && directMatch.status === 'running') {
return directMatch;
}
// Search by conversationId in metadata
for (const job of this.jobs.values()) {
if (job.conversationId === conversationId && job.status === 'running') {
return job;
}
}
return null;
}
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
this.updateJobSync(streamId, updates);
}
/** Synchronous version for in-memory use */
updateJobSync(streamId: string, updates: Partial<SerializableJobData>): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
Object.assign(job, updates);
}
async deleteJob(streamId: string): Promise<void> {
this.deleteJobSync(streamId);
}
/** Synchronous version for in-memory use */
deleteJobSync(streamId: string): void {
this.jobs.delete(streamId);
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
}
async hasJob(streamId: string): Promise<boolean> {
return this.hasJobSync(streamId);
}
/** Synchronous version for in-memory use */
hasJobSync(streamId: string): boolean {
return this.jobs.has(streamId);
}
async getRunningJobs(): Promise<SerializableJobData[]> {
const running: SerializableJobData[] = [];
for (const job of this.jobs.values()) {
if (job.status === 'running') {
running.push(job);
}
}
return running;
}
async cleanup(): Promise<number> {
const now = Date.now();
const toDelete: string[] = [];
for (const [streamId, job] of this.jobs) {
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) {
toDelete.push(streamId);
}
}
for (const id of toDelete) {
await this.deleteJob(id);
}
if (toDelete.length > 0) {
logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`);
}
return toDelete.length;
}
private async evictOldest(): Promise<void> {
this.evictOldestSync();
}
/** Synchronous version for in-memory use */
private evictOldestSync(): void {
let oldestId: string | null = null;
let oldestTime = Infinity;
for (const [streamId, job] of this.jobs) {
if (job.createdAt < oldestTime) {
oldestTime = job.createdAt;
oldestId = streamId;
}
}
if (oldestId) {
logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`);
this.deleteJobSync(oldestId);
}
}
/** Get job count (for monitoring) */
getJobCount(): number {
return this.jobs.size;
}
/** Get job count by status (for monitoring) */
getJobCountByStatus(): Record<JobStatus, number> {
const counts: Record<JobStatus, number> = {
running: 0,
complete: 0,
error: 0,
aborted: 0,
};
for (const job of this.jobs.values()) {
counts[job.status]++;
}
return counts;
}
destroy(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
this.jobs.clear();
logger.debug('[InMemoryJobStore] Destroyed');
}
}

View file

@ -0,0 +1,3 @@
export * from './InMemoryJobStore';
export * from './InMemoryContentState';
export * from './InMemoryEventTransport';

View file

@ -0,0 +1,139 @@
import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
/**
* Job status enum
*/
export type JobStatus = 'running' | 'complete' | 'error' | 'aborted';
/**
* Serializable job data - no object references, suitable for Redis/external storage
*/
export interface SerializableJobData {
streamId: string;
userId: string;
status: JobStatus;
createdAt: number;
completedAt?: number;
conversationId?: string;
error?: string;
/** User message metadata */
userMessage?: {
messageId: string;
parentMessageId?: string;
conversationId?: string;
text?: string;
};
/** Response message ID for reconnection */
responseMessageId?: string;
/** Sender name for UI display */
sender?: string;
/** Whether sync has been sent to a client */
syncSent: boolean;
/** Serialized final event for replay */
finalEvent?: string;
}
/**
* Resume state for reconnecting clients
*/
export interface ResumeState {
runSteps: Agents.RunStep[];
aggregatedContent: Agents.MessageContentComplex[];
userMessage?: SerializableJobData['userMessage'];
responseMessageId?: string;
conversationId?: string;
sender?: string;
}
/**
* Interface for job storage backend.
* Implementations can use in-memory Map, Redis, KV store, etc.
*/
export interface IJobStore {
/** Create a new job */
createJob(
streamId: string,
userId: string,
conversationId?: string,
): Promise<SerializableJobData>;
/** Get a job by streamId */
getJob(streamId: string): Promise<SerializableJobData | null>;
/** Find active job by conversationId */
getJobByConversation(conversationId: string): Promise<SerializableJobData | null>;
/** Update job data */
updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void>;
/** Delete a job */
deleteJob(streamId: string): Promise<void>;
/** Check if job exists */
hasJob(streamId: string): Promise<boolean>;
/** Get all running jobs (for cleanup) */
getRunningJobs(): Promise<SerializableJobData[]>;
/** Cleanup expired jobs */
cleanup(): Promise<number>;
}
/**
* Interface for pub/sub event transport.
* Implementations can use EventEmitter, Redis Pub/Sub, etc.
*/
export interface IEventTransport {
/** Subscribe to events for a stream */
subscribe(
streamId: string,
handlers: {
onChunk: (event: unknown) => void;
onDone?: (event: unknown) => void;
onError?: (error: string) => void;
},
): { unsubscribe: () => void };
/** Publish a chunk event */
emitChunk(streamId: string, event: unknown): void;
/** Publish a done event */
emitDone(streamId: string, event: unknown): void;
/** Publish an error event */
emitError(streamId: string, error: string): void;
/** Get subscriber count for a stream */
getSubscriberCount(streamId: string): number;
/** Listen for all subscribers leaving */
onAllSubscribersLeft(streamId: string, callback: () => void): void;
}
/**
* Interface for content state management.
* Separates volatile content state from persistent job data.
* In-memory only - not persisted to external storage.
*/
export interface IContentStateManager {
/** Set content parts reference (in-memory only) */
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void;
/** Get content parts */
getContentParts(streamId: string): Agents.MessageContentComplex[] | null;
/** Set graph reference for run steps */
setGraph(streamId: string, graph: StandardGraph): void;
/** Get run steps from graph */
getRunSteps(streamId: string): Agents.RunStep[];
/** Clear content state for a job */
clearContentState(streamId: string): void;
}

View file

@ -0,0 +1 @@
export * from './IJobStore';

View file

@ -1,6 +1,5 @@
import type { EventEmitter } from 'events';
import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
import type { ServerSentEvent } from '~/types';
export interface GenerationJobMetadata {
@ -27,14 +26,8 @@ export interface GenerationJob {
metadata: GenerationJobMetadata;
readyPromise: Promise<void>;
resolveReady: () => void;
/** Buffered chunks for replay on reconnect */
chunks: ServerSentEvent[];
/** Final event when job completes */
finalEvent?: ServerSentEvent;
/** Reference to graph's contentParts - the authoritative content source */
contentPartsRef?: Agents.MessageContentComplex[];
/** Reference to the graph instance for accessing run steps (contentData) */
graphRef?: StandardGraph;
/** Flag to indicate if a sync event was already sent (prevent duplicate replays) */
syncSent?: boolean;
}

View file

@ -8,7 +8,7 @@
"target": "es2015",
"moduleResolution": "node",
"allowSyntheticDefaultImports": true,
"lib": ["es2017", "dom", "ES2021.String"],
"lib": ["es2017", "dom", "ES2021.String", "ES2021.WeakRef"],
"allowJs": true,
"skipLibCheck": true,
"esModuleInterop": true,

View file

@ -195,6 +195,7 @@ export namespace Agents {
userMessage?: UserMessageMeta;
responseMessageId?: string;
conversationId?: string;
sender?: string;
}
/**
* Represents a run step delta i.e. any changed fields on a run step during