mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-22 11:20:15 +01:00
refactor: Enhance GenerationJobManager with In-Memory Implementations
- Introduced InMemoryJobStore, InMemoryEventTransport, and InMemoryContentState for improved job management and event handling. - Updated GenerationJobManager to utilize these new implementations, allowing for better separation of concerns and easier maintenance. - Enhanced job metadata handling to support user messages and response IDs for resumable functionality. - Improved cleanup and state management processes to prevent memory leaks and ensure efficient resource usage.
This commit is contained in:
parent
9eec76ee0c
commit
3a0c1476da
14 changed files with 892 additions and 321 deletions
107
packages/api/src/stream/implementations/InMemoryContentState.ts
Normal file
107
packages/api/src/stream/implementations/InMemoryContentState.ts
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
import type { Agents } from 'librechat-data-provider';
|
||||
import type { StandardGraph } from '@librechat/agents';
|
||||
import type { IContentStateManager } from '../interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Content state entry - volatile, in-memory only.
|
||||
* Uses WeakRef to allow garbage collection of graph when no longer needed.
|
||||
*/
|
||||
interface ContentState {
|
||||
contentParts: Agents.MessageContentComplex[];
|
||||
graphRef: WeakRef<StandardGraph> | null;
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory content state manager.
|
||||
* Manages volatile references to graph content that should NOT be persisted.
|
||||
* Uses WeakRef for graph to allow garbage collection.
|
||||
*/
|
||||
export class InMemoryContentState implements IContentStateManager {
|
||||
private state = new Map<string, ContentState>();
|
||||
|
||||
/** Cleanup interval for orphaned entries */
|
||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor() {
|
||||
// Cleanup orphaned content state every 5 minutes
|
||||
this.cleanupInterval = setInterval(() => {
|
||||
this.cleanupOrphaned();
|
||||
}, 300000);
|
||||
|
||||
if (this.cleanupInterval.unref) {
|
||||
this.cleanupInterval.unref();
|
||||
}
|
||||
}
|
||||
|
||||
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
||||
const existing = this.state.get(streamId);
|
||||
if (existing) {
|
||||
existing.contentParts = contentParts;
|
||||
} else {
|
||||
this.state.set(streamId, { contentParts, graphRef: null });
|
||||
}
|
||||
}
|
||||
|
||||
getContentParts(streamId: string): Agents.MessageContentComplex[] | null {
|
||||
return this.state.get(streamId)?.contentParts ?? null;
|
||||
}
|
||||
|
||||
setGraph(streamId: string, graph: StandardGraph): void {
|
||||
const existing = this.state.get(streamId);
|
||||
if (existing) {
|
||||
existing.graphRef = new WeakRef(graph);
|
||||
} else {
|
||||
this.state.set(streamId, {
|
||||
contentParts: [],
|
||||
graphRef: new WeakRef(graph),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
getRunSteps(streamId: string): Agents.RunStep[] {
|
||||
const state = this.state.get(streamId);
|
||||
if (!state?.graphRef) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Dereference WeakRef - may return undefined if GC'd
|
||||
const graph = state.graphRef.deref();
|
||||
return graph?.contentData ?? [];
|
||||
}
|
||||
|
||||
clearContentState(streamId: string): void {
|
||||
this.state.delete(streamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup entries where graph has been garbage collected.
|
||||
* These are orphaned states that are no longer useful.
|
||||
*/
|
||||
private cleanupOrphaned(): void {
|
||||
const toDelete: string[] = [];
|
||||
|
||||
for (const [streamId, state] of this.state) {
|
||||
// If graphRef exists but has been GC'd, this state is orphaned
|
||||
if (state.graphRef && !state.graphRef.deref()) {
|
||||
toDelete.push(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
for (const id of toDelete) {
|
||||
this.state.delete(id);
|
||||
}
|
||||
}
|
||||
|
||||
/** Get count of tracked streams (for monitoring) */
|
||||
getStateCount(): number {
|
||||
return this.state.size;
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
this.cleanupInterval = null;
|
||||
}
|
||||
this.state.clear();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
import { EventEmitter } from 'events';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import type { IEventTransport } from '../interfaces/IJobStore';
|
||||
|
||||
interface StreamState {
|
||||
emitter: EventEmitter;
|
||||
allSubscribersLeftCallback?: () => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory event transport using Node.js EventEmitter.
|
||||
* For horizontal scaling, replace with RedisEventTransport.
|
||||
*/
|
||||
export class InMemoryEventTransport implements IEventTransport {
|
||||
private streams = new Map<string, StreamState>();
|
||||
|
||||
private getOrCreateStream(streamId: string): StreamState {
|
||||
let state = this.streams.get(streamId);
|
||||
if (!state) {
|
||||
const emitter = new EventEmitter();
|
||||
emitter.setMaxListeners(100);
|
||||
state = { emitter };
|
||||
this.streams.set(streamId, state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
subscribe(
|
||||
streamId: string,
|
||||
handlers: {
|
||||
onChunk: (event: unknown) => void;
|
||||
onDone?: (event: unknown) => void;
|
||||
onError?: (error: string) => void;
|
||||
},
|
||||
): { unsubscribe: () => void } {
|
||||
const state = this.getOrCreateStream(streamId);
|
||||
|
||||
const chunkHandler = (event: unknown) => handlers.onChunk(event);
|
||||
const doneHandler = (event: unknown) => handlers.onDone?.(event);
|
||||
const errorHandler = (error: string) => handlers.onError?.(error);
|
||||
|
||||
state.emitter.on('chunk', chunkHandler);
|
||||
state.emitter.on('done', doneHandler);
|
||||
state.emitter.on('error', errorHandler);
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
const currentState = this.streams.get(streamId);
|
||||
if (currentState) {
|
||||
currentState.emitter.off('chunk', chunkHandler);
|
||||
currentState.emitter.off('done', doneHandler);
|
||||
currentState.emitter.off('error', errorHandler);
|
||||
|
||||
// Check if all subscribers left
|
||||
if (currentState.emitter.listenerCount('chunk') === 0) {
|
||||
currentState.allSubscribersLeftCallback?.();
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
emitChunk(streamId: string, event: unknown): void {
|
||||
const state = this.streams.get(streamId);
|
||||
state?.emitter.emit('chunk', event);
|
||||
}
|
||||
|
||||
emitDone(streamId: string, event: unknown): void {
|
||||
const state = this.streams.get(streamId);
|
||||
state?.emitter.emit('done', event);
|
||||
}
|
||||
|
||||
emitError(streamId: string, error: string): void {
|
||||
const state = this.streams.get(streamId);
|
||||
state?.emitter.emit('error', error);
|
||||
}
|
||||
|
||||
getSubscriberCount(streamId: string): number {
|
||||
const state = this.streams.get(streamId);
|
||||
return state?.emitter.listenerCount('chunk') ?? 0;
|
||||
}
|
||||
|
||||
onAllSubscribersLeft(streamId: string, callback: () => void): void {
|
||||
const state = this.getOrCreateStream(streamId);
|
||||
state.allSubscribersLeftCallback = callback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this is the first subscriber (for ready signaling)
|
||||
*/
|
||||
isFirstSubscriber(streamId: string): boolean {
|
||||
const state = this.streams.get(streamId);
|
||||
return state?.emitter.listenerCount('chunk') === 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup a stream's event emitter
|
||||
*/
|
||||
cleanup(streamId: string): void {
|
||||
const state = this.streams.get(streamId);
|
||||
if (state) {
|
||||
state.emitter.removeAllListeners();
|
||||
this.streams.delete(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of tracked streams (for monitoring)
|
||||
*/
|
||||
getStreamCount(): number {
|
||||
return this.streams.size;
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
for (const state of this.streams.values()) {
|
||||
state.emitter.removeAllListeners();
|
||||
}
|
||||
this.streams.clear();
|
||||
logger.debug('[InMemoryEventTransport] Destroyed');
|
||||
}
|
||||
}
|
||||
219
packages/api/src/stream/implementations/InMemoryJobStore.ts
Normal file
219
packages/api/src/stream/implementations/InMemoryJobStore.ts
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
import { logger } from '@librechat/data-schemas';
|
||||
import type { IJobStore, SerializableJobData, JobStatus } from '../interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* In-memory implementation of IJobStore.
|
||||
* Suitable for single-instance deployments.
|
||||
* For horizontal scaling, use RedisJobStore.
|
||||
*/
|
||||
export class InMemoryJobStore implements IJobStore {
|
||||
private jobs = new Map<string, SerializableJobData>();
|
||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
/** Time to keep completed jobs before cleanup (5 minutes - reduced from 1 hour) */
|
||||
private ttlAfterComplete = 300000;
|
||||
|
||||
/** Maximum number of concurrent jobs */
|
||||
private maxJobs = 1000;
|
||||
|
||||
constructor(options?: { ttlAfterComplete?: number; maxJobs?: number }) {
|
||||
if (options?.ttlAfterComplete) {
|
||||
this.ttlAfterComplete = options.ttlAfterComplete;
|
||||
}
|
||||
if (options?.maxJobs) {
|
||||
this.maxJobs = options.maxJobs;
|
||||
}
|
||||
}
|
||||
|
||||
initialize(): void {
|
||||
if (this.cleanupInterval) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.cleanupInterval = setInterval(() => {
|
||||
this.cleanup();
|
||||
}, 60000);
|
||||
|
||||
if (this.cleanupInterval.unref) {
|
||||
this.cleanupInterval.unref();
|
||||
}
|
||||
|
||||
logger.debug('[InMemoryJobStore] Initialized with cleanup interval');
|
||||
}
|
||||
|
||||
async createJob(
|
||||
streamId: string,
|
||||
userId: string,
|
||||
conversationId?: string,
|
||||
): Promise<SerializableJobData> {
|
||||
return this.createJobSync(streamId, userId, conversationId);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
createJobSync(streamId: string, userId: string, conversationId?: string): SerializableJobData {
|
||||
if (this.jobs.size >= this.maxJobs) {
|
||||
this.evictOldestSync();
|
||||
}
|
||||
|
||||
const job: SerializableJobData = {
|
||||
streamId,
|
||||
userId,
|
||||
status: 'running',
|
||||
createdAt: Date.now(),
|
||||
conversationId,
|
||||
syncSent: false,
|
||||
};
|
||||
|
||||
this.jobs.set(streamId, job);
|
||||
logger.debug(`[InMemoryJobStore] Created job: ${streamId}`);
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
async getJob(streamId: string): Promise<SerializableJobData | null> {
|
||||
return this.getJobSync(streamId);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
getJobSync(streamId: string): SerializableJobData | null {
|
||||
return this.jobs.get(streamId) ?? null;
|
||||
}
|
||||
|
||||
async getJobByConversation(conversationId: string): Promise<SerializableJobData | null> {
|
||||
return this.getJobByConversationSync(conversationId);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
getJobByConversationSync(conversationId: string): SerializableJobData | null {
|
||||
// Direct match first (streamId === conversationId for existing conversations)
|
||||
const directMatch = this.jobs.get(conversationId);
|
||||
if (directMatch && directMatch.status === 'running') {
|
||||
return directMatch;
|
||||
}
|
||||
|
||||
// Search by conversationId in metadata
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.conversationId === conversationId && job.status === 'running') {
|
||||
return job;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
|
||||
this.updateJobSync(streamId, updates);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
updateJobSync(streamId: string, updates: Partial<SerializableJobData>): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
Object.assign(job, updates);
|
||||
}
|
||||
|
||||
async deleteJob(streamId: string): Promise<void> {
|
||||
this.deleteJobSync(streamId);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
deleteJobSync(streamId: string): void {
|
||||
this.jobs.delete(streamId);
|
||||
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
|
||||
}
|
||||
|
||||
async hasJob(streamId: string): Promise<boolean> {
|
||||
return this.hasJobSync(streamId);
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
hasJobSync(streamId: string): boolean {
|
||||
return this.jobs.has(streamId);
|
||||
}
|
||||
|
||||
async getRunningJobs(): Promise<SerializableJobData[]> {
|
||||
const running: SerializableJobData[] = [];
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.status === 'running') {
|
||||
running.push(job);
|
||||
}
|
||||
}
|
||||
return running;
|
||||
}
|
||||
|
||||
async cleanup(): Promise<number> {
|
||||
const now = Date.now();
|
||||
const toDelete: string[] = [];
|
||||
|
||||
for (const [streamId, job] of this.jobs) {
|
||||
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
|
||||
if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) {
|
||||
toDelete.push(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
for (const id of toDelete) {
|
||||
await this.deleteJob(id);
|
||||
}
|
||||
|
||||
if (toDelete.length > 0) {
|
||||
logger.debug(`[InMemoryJobStore] Cleaned up ${toDelete.length} expired jobs`);
|
||||
}
|
||||
|
||||
return toDelete.length;
|
||||
}
|
||||
|
||||
private async evictOldest(): Promise<void> {
|
||||
this.evictOldestSync();
|
||||
}
|
||||
|
||||
/** Synchronous version for in-memory use */
|
||||
private evictOldestSync(): void {
|
||||
let oldestId: string | null = null;
|
||||
let oldestTime = Infinity;
|
||||
|
||||
for (const [streamId, job] of this.jobs) {
|
||||
if (job.createdAt < oldestTime) {
|
||||
oldestTime = job.createdAt;
|
||||
oldestId = streamId;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldestId) {
|
||||
logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`);
|
||||
this.deleteJobSync(oldestId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Get job count (for monitoring) */
|
||||
getJobCount(): number {
|
||||
return this.jobs.size;
|
||||
}
|
||||
|
||||
/** Get job count by status (for monitoring) */
|
||||
getJobCountByStatus(): Record<JobStatus, number> {
|
||||
const counts: Record<JobStatus, number> = {
|
||||
running: 0,
|
||||
complete: 0,
|
||||
error: 0,
|
||||
aborted: 0,
|
||||
};
|
||||
|
||||
for (const job of this.jobs.values()) {
|
||||
counts[job.status]++;
|
||||
}
|
||||
|
||||
return counts;
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
this.cleanupInterval = null;
|
||||
}
|
||||
this.jobs.clear();
|
||||
logger.debug('[InMemoryJobStore] Destroyed');
|
||||
}
|
||||
}
|
||||
3
packages/api/src/stream/implementations/index.ts
Normal file
3
packages/api/src/stream/implementations/index.ts
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
export * from './InMemoryJobStore';
|
||||
export * from './InMemoryContentState';
|
||||
export * from './InMemoryEventTransport';
|
||||
Loading…
Add table
Add a link
Reference in a new issue