refactor: Consolidate content state management into IJobStore for improved job handling

- Removed InMemoryContentState and integrated its functionality into InMemoryJobStore, streamlining content state management.
- Updated GenerationJobManager to utilize jobStore for content state operations, enhancing clarity and reducing redundancy.
- Introduced RedisJobStore for horizontal scaling, allowing for efficient job management and content reconstruction from chunks.
- Updated IJobStore interface to reflect changes in content state handling, ensuring consistency across implementations.
This commit is contained in:
Danny Avila 2025-12-14 20:59:41 -05:00
parent bfaed6228b
commit e51c8870e6
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
6 changed files with 632 additions and 156 deletions

View file

@ -1,8 +1,7 @@
import { logger } from '@librechat/data-schemas';
import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
import type { Agents } from 'librechat-data-provider';
import type {
IContentStateManager,
SerializableJobData,
IEventTransport,
AbortResult,
@ -10,7 +9,6 @@ import type {
} from './interfaces/IJobStore';
import type * as t from '~/types';
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
import { InMemoryContentState } from './implementations/InMemoryContentState';
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
/**
@ -40,10 +38,13 @@ interface RuntimeJobState {
/**
* Manages generation jobs for resumable LLM streams.
*
* Architecture: Composes three pluggable services via dependency injection:
* - jobStore: Serializable job metadata (InMemory Redis/KV for horizontal scaling)
* Architecture: Composes two pluggable services via dependency injection:
* - jobStore: Job metadata + content state (InMemory Redis for horizontal scaling)
* - eventTransport: Pub/sub events (InMemory Redis Pub/Sub for horizontal scaling)
* - contentState: Volatile content refs with WeakRef (always in-memory, not shared)
*
* Content state is tied to jobs:
* - In-memory: jobStore holds WeakRef to graph for live content/run steps access
* - Redis: jobStore persists chunks, reconstructs content on demand
*
* All storage methods are async to support both in-memory and external stores (Redis, etc.).
*
@ -52,17 +53,14 @@ interface RuntimeJobState {
* const manager = new GenerationJobManagerClass({
* jobStore: new RedisJobStore(redisClient),
* eventTransport: new RedisPubSubTransport(redisClient),
* contentState: new InMemoryContentState(), // Always local
* });
* ```
*/
class GenerationJobManagerClass {
/** Job metadata storage - swappable for Redis, KV store, etc. */
/** Job metadata + content state storage - swappable for Redis, etc. */
private jobStore: IJobStore;
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
private eventTransport: IEventTransport;
/** Volatile content state with WeakRef - always in-memory per instance */
private contentState: IContentStateManager;
/** Runtime state - always in-memory, not serializable */
private runtimeState = new Map<string, RuntimeJobState>();
@ -72,7 +70,6 @@ class GenerationJobManagerClass {
constructor() {
this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 });
this.eventTransport = new InMemoryEventTransport();
this.contentState = new InMemoryContentState();
}
/**
@ -149,7 +146,7 @@ class GenerationJobManagerClass {
if (currentRuntime) {
currentRuntime.syncSent = false;
// Call registered handlers (from job.emitter.on('allSubscribersLeft', ...))
const content = this.contentState.getContentParts(streamId) ?? [];
const content = this.jobStore.getContentParts(streamId) ?? [];
if (currentRuntime.allSubscribersLeftHandlers) {
for (const handler of currentRuntime.allSubscribersLeftHandlers) {
try {
@ -286,7 +283,7 @@ class GenerationJobManagerClass {
});
// Clear content state
this.contentState.clearContentState(streamId);
this.jobStore.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
}
@ -314,7 +311,7 @@ class GenerationJobManagerClass {
});
// Get content and extract text
const content = this.contentState.getContentParts(streamId) ?? [];
const content = this.jobStore.getContentParts(streamId) ?? [];
const text = this.extractTextFromContent(content);
// Create final event for abort
@ -352,7 +349,7 @@ class GenerationJobManagerClass {
}
this.eventTransport.emitDone(streamId, abortFinalEvent);
this.contentState.clearContentState(streamId);
this.jobStore.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
@ -532,7 +529,7 @@ class GenerationJobManagerClass {
if (!this.runtimeState.has(streamId)) {
return;
}
this.contentState.setContentParts(streamId, contentParts);
this.jobStore.setContentParts(streamId, contentParts);
logger.debug(`[GenerationJobManager] Set contentParts for ${streamId}`);
}
@ -544,7 +541,7 @@ class GenerationJobManagerClass {
if (!this.runtimeState.has(streamId)) {
return;
}
this.contentState.setGraph(streamId, graph);
this.jobStore.setGraph(streamId, graph);
logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`);
}
@ -557,8 +554,8 @@ class GenerationJobManagerClass {
return null;
}
const aggregatedContent = this.contentState.getContentParts(streamId) ?? [];
const runSteps = this.contentState.getRunSteps(streamId);
const aggregatedContent = this.jobStore.getContentParts(streamId) ?? [];
const runSteps = this.jobStore.getRunSteps(streamId);
logger.debug(`[GenerationJobManager] getResumeState:`, {
streamId,
@ -621,7 +618,7 @@ class GenerationJobManagerClass {
for (const streamId of this.runtimeState.keys()) {
if (!(await this.jobStore.hasJob(streamId))) {
this.runtimeState.delete(streamId);
this.contentState.clearContentState(streamId);
this.jobStore.clearContentState(streamId);
this.eventTransport.cleanup(streamId);
}
}
@ -648,7 +645,7 @@ class GenerationJobManagerClass {
return {
active: jobData.status === 'running',
status: jobData.status as t.GenerationJobStatus,
aggregatedContent: this.contentState.getContentParts(streamId) ?? [],
aggregatedContent: this.jobStore.getContentParts(streamId) ?? [],
createdAt: jobData.createdAt,
};
}
@ -684,7 +681,6 @@ class GenerationJobManagerClass {
await this.jobStore.destroy();
this.eventTransport.destroy();
this.contentState.destroy();
this.runtimeState.clear();
logger.debug('[GenerationJobManager] Destroyed');