import { logger } from '@librechat/data-schemas'; import type { StandardGraph } from '@librechat/agents'; import type { Agents } from 'librechat-data-provider'; import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore'; /** * Content state for a job - volatile, in-memory only. * Uses WeakRef to allow garbage collection of graph when no longer needed. */ interface ContentState { contentParts: Agents.MessageContentComplex[]; graphRef: WeakRef | null; } /** * In-memory implementation of IJobStore. * Suitable for single-instance deployments. * For horizontal scaling, use RedisJobStore. * * Content state is tied to jobs: * - Uses WeakRef to graph for live access to contentParts and contentData (run steps) * - No chunk persistence needed - same instance handles generation and reconnects */ export class InMemoryJobStore implements IJobStore { private jobs = new Map(); private contentState = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; /** Time to keep completed jobs before cleanup (5 minutes) */ private ttlAfterComplete = 300000; /** Maximum number of concurrent jobs */ private maxJobs = 1000; constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) { if (options?.ttlAfterComplete) { this.ttlAfterComplete = options.ttlAfterComplete; } if (options?.maxJobs) { this.maxJobs = options.maxJobs; } } async initialize(): Promise { if (this.cleanupInterval) { return; } this.cleanupInterval = setInterval(() => { this.cleanup(); }, 60000); if (this.cleanupInterval.unref) { this.cleanupInterval.unref(); } logger.debug('[InMemoryJobStore] Initialized with cleanup interval'); } async createJob( streamId: string, userId: string, conversationId?: string, ): Promise { if (this.jobs.size >= this.maxJobs) { await this.evictOldest(); } const job: SerializableJobData = { streamId, userId, status: 'running', createdAt: Date.now(), conversationId, syncSent: false, }; this.jobs.set(streamId, job); logger.debug(`[InMemoryJobStore] Created job: ${streamId}`); return job; } async getJob(streamId: string): Promise { return this.jobs.get(streamId) ?? null; } async updateJob(streamId: string, updates: Partial): Promise { const job = this.jobs.get(streamId); if (!job) { return; } Object.assign(job, updates); } async deleteJob(streamId: string): Promise { this.jobs.delete(streamId); this.contentState.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); } async hasJob(streamId: string): Promise { return this.jobs.has(streamId); } async getRunningJobs(): Promise { const running: SerializableJobData[] = []; for (const job of this.jobs.values()) { if (job.status === 'running') { running.push(job); } } return running; } async cleanup(): Promise { const now = Date.now(); const toDelete: string[] = []; for (const [streamId, job] of this.jobs) { const isFinished = ['complete', 'error', 'aborted'].includes(job.status); if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) { toDelete.push(streamId); } } for (const id of toDelete) { await this.deleteJob(id); } if (toDelete.length > 0) { logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`); } return toDelete.length; } private async evictOldest(): Promise { let oldestId: string | null = null; let oldestTime = Infinity; for (const [streamId, job] of this.jobs) { if (job.createdAt < oldestTime) { oldestTime = job.createdAt; oldestId = streamId; } } if (oldestId) { logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`); await this.deleteJob(oldestId); } } /** Get job count (for monitoring) */ async getJobCount(): Promise { return this.jobs.size; } /** Get job count by status (for monitoring) */ async getJobCountByStatus(status: JobStatus): Promise { let count = 0; for (const job of this.jobs.values()) { if (job.status === status) { count++; } } return count; } async destroy(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } this.jobs.clear(); this.contentState.clear(); logger.debug('[InMemoryJobStore] Destroyed'); } // ===== Content State Methods ===== /** * Set the graph reference for a job. * Uses WeakRef to allow garbage collection when graph is no longer needed. */ setGraph(streamId: string, graph: StandardGraph): void { const existing = this.contentState.get(streamId); if (existing) { existing.graphRef = new WeakRef(graph); } else { this.contentState.set(streamId, { contentParts: [], graphRef: new WeakRef(graph), }); } } /** * Set content parts reference for a job. */ setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { const existing = this.contentState.get(streamId); if (existing) { existing.contentParts = contentParts; } else { this.contentState.set(streamId, { contentParts, graphRef: null }); } } /** * Get content parts for a job. * Returns live content from stored reference. */ getContentParts(streamId: string): Agents.MessageContentComplex[] | null { return this.contentState.get(streamId)?.contentParts ?? null; } /** * Get run steps for a job from graph.contentData. * Uses WeakRef - may return empty if graph has been GC'd. */ getRunSteps(streamId: string): Agents.RunStep[] { const state = this.contentState.get(streamId); if (!state?.graphRef) { return []; } // Dereference WeakRef - may return undefined if GC'd const graph = state.graphRef.deref(); return graph?.contentData ?? []; } /** * No-op for in-memory - content available via graph reference. */ async appendChunk(): Promise { // No-op: content available via graph reference } /** * Clear content state for a job. */ clearContentState(streamId: string): void { this.contentState.delete(streamId); } }