mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-18 17:30:16 +01:00
refactor: Convert GenerationJobManager methods to async for improved performance
- Updated methods in GenerationJobManager and InMemoryJobStore to be asynchronous, enhancing the handling of job creation, retrieval, and management. - Adjusted the ResumableAgentController and related routes to await job operations, ensuring proper flow and error handling. - Increased timeout duration in ResumableAgentController's startGeneration function to 3500ms for better subscriber readiness management.
This commit is contained in:
parent
04cc190c71
commit
7fbc05a165
5 changed files with 104 additions and 107 deletions
|
|
@ -1,7 +1,12 @@
|
|||
import { logger } from '@librechat/data-schemas';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
import type { StandardGraph } from '@librechat/agents';
|
||||
import type { SerializableJobData } from './interfaces/IJobStore';
|
||||
import type {
|
||||
IContentStateManager,
|
||||
SerializableJobData,
|
||||
IEventTransport,
|
||||
IJobStore,
|
||||
} from './interfaces/IJobStore';
|
||||
import type * as t from '~/types';
|
||||
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
||||
import { InMemoryContentState } from './implementations/InMemoryContentState';
|
||||
|
|
@ -34,15 +39,14 @@ interface RuntimeJobState {
|
|||
/**
|
||||
* Manages generation jobs for resumable LLM streams.
|
||||
*
|
||||
* Architecture: Composes three pluggable services for clean separation:
|
||||
* Architecture: Composes three pluggable services via dependency injection:
|
||||
* - jobStore: Serializable job metadata (InMemory → Redis/KV for horizontal scaling)
|
||||
* - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling)
|
||||
* - contentState: Volatile content refs with WeakRef (always in-memory, not shared)
|
||||
*
|
||||
* Current implementation uses sync methods for performance. When adding Redis support,
|
||||
* the manager methods will need to become async, or use a sync-capable Redis client.
|
||||
* All storage methods are async to support both in-memory and external stores (Redis, etc.).
|
||||
*
|
||||
* @example Future Redis injection (requires async refactor):
|
||||
* @example Redis injection:
|
||||
* ```ts
|
||||
* const manager = new GenerationJobManagerClass({
|
||||
* jobStore: new RedisJobStore(redisClient),
|
||||
|
|
@ -53,11 +57,11 @@ interface RuntimeJobState {
|
|||
*/
|
||||
class GenerationJobManagerClass {
|
||||
/** Job metadata storage - swappable for Redis, KV store, etc. */
|
||||
private jobStore: InMemoryJobStore;
|
||||
private jobStore: IJobStore;
|
||||
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
|
||||
private eventTransport: InMemoryEventTransport;
|
||||
private eventTransport: IEventTransport;
|
||||
/** Volatile content state with WeakRef - always in-memory per instance */
|
||||
private contentState: InMemoryContentState;
|
||||
private contentState: IContentStateManager;
|
||||
|
||||
/** Runtime state - always in-memory, not serializable */
|
||||
private runtimeState = new Map<string, RuntimeJobState>();
|
||||
|
|
@ -106,11 +110,14 @@ class GenerationJobManagerClass {
|
|||
* @param streamId - Unique identifier for this stream
|
||||
* @param userId - User who initiated the request
|
||||
* @param conversationId - Optional conversation ID for lookup
|
||||
* @returns A facade object compatible with the old GenerationJob interface
|
||||
* @returns A facade object for the GenerationJob
|
||||
*/
|
||||
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
|
||||
// Create serializable job data (sync for in-memory)
|
||||
const jobData = this.jobStore.createJobSync(streamId, userId, conversationId);
|
||||
async createJob(
|
||||
streamId: string,
|
||||
userId: string,
|
||||
conversationId?: string,
|
||||
): Promise<t.GenerationJob> {
|
||||
const jobData = await this.jobStore.createJob(streamId, userId, conversationId);
|
||||
|
||||
/**
|
||||
* Create runtime state with readyPromise.
|
||||
|
|
@ -243,8 +250,8 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Get a job by streamId.
|
||||
*/
|
||||
getJob(streamId: string): t.GenerationJob | undefined {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
async getJob(streamId: string): Promise<t.GenerationJob | undefined> {
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (!jobData || !runtime) {
|
||||
return undefined;
|
||||
|
|
@ -255,8 +262,8 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Find an active job by conversationId.
|
||||
*/
|
||||
getJobByConversation(conversationId: string): t.GenerationJob | undefined {
|
||||
const jobData = this.jobStore.getJobByConversationSync(conversationId);
|
||||
async getJobByConversation(conversationId: string): Promise<t.GenerationJob | undefined> {
|
||||
const jobData = await this.jobStore.getJobByConversation(conversationId);
|
||||
if (!jobData) {
|
||||
return undefined;
|
||||
}
|
||||
|
|
@ -270,15 +277,15 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Check if a job exists.
|
||||
*/
|
||||
hasJob(streamId: string): boolean {
|
||||
return this.jobStore.hasJobSync(streamId);
|
||||
async hasJob(streamId: string): Promise<boolean> {
|
||||
return this.jobStore.hasJob(streamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job status.
|
||||
*/
|
||||
getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
async getJobStatus(streamId: string): Promise<t.GenerationJobStatus | undefined> {
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
return jobData?.status as t.GenerationJobStatus | undefined;
|
||||
}
|
||||
|
||||
|
|
@ -302,7 +309,7 @@ class GenerationJobManagerClass {
|
|||
* Abort a job (user-initiated).
|
||||
*/
|
||||
async abortJob(streamId: string): Promise<void> {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
|
||||
if (!jobData) {
|
||||
|
|
@ -376,18 +383,18 @@ class GenerationJobManagerClass {
|
|||
* @param onError - Handler for error events
|
||||
* @returns Subscription object with unsubscribe function, or null if job not found
|
||||
*/
|
||||
subscribe(
|
||||
async subscribe(
|
||||
streamId: string,
|
||||
onChunk: t.ChunkHandler,
|
||||
onDone?: t.DoneHandler,
|
||||
onError?: t.ErrorHandler,
|
||||
): { unsubscribe: t.UnsubscribeFn } | null {
|
||||
): Promise<{ unsubscribe: t.UnsubscribeFn } | null> {
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (!runtime) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
|
||||
// If job already complete, send final event
|
||||
setImmediate(() => {
|
||||
|
|
@ -429,10 +436,11 @@ class GenerationJobManagerClass {
|
|||
|
||||
/**
|
||||
* Emit a chunk event to all subscribers.
|
||||
* Uses runtime state check for performance (avoids async job store lookup per token).
|
||||
*/
|
||||
emitChunk(streamId: string, event: t.ServerSentEvent): void {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
if (!jobData || jobData.status !== 'running') {
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (!runtime || runtime.abortController.signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -494,7 +502,8 @@ class GenerationJobManagerClass {
|
|||
* Set reference to the graph's contentParts array.
|
||||
*/
|
||||
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
// Use runtime state check for performance (sync check)
|
||||
if (!this.runtimeState.has(streamId)) {
|
||||
return;
|
||||
}
|
||||
this.contentState.setContentParts(streamId, contentParts);
|
||||
|
|
@ -505,7 +514,8 @@ class GenerationJobManagerClass {
|
|||
* Set reference to the graph instance.
|
||||
*/
|
||||
setGraph(streamId: string, graph: StandardGraph): void {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
// Use runtime state check for performance (sync check)
|
||||
if (!this.runtimeState.has(streamId)) {
|
||||
return;
|
||||
}
|
||||
this.contentState.setGraph(streamId, graph);
|
||||
|
|
@ -515,8 +525,8 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Get resume state for reconnecting clients.
|
||||
*/
|
||||
getResumeState(streamId: string): t.ResumeState | null {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
async getResumeState(streamId: string): Promise<t.ResumeState | null> {
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
if (!jobData) {
|
||||
return null;
|
||||
}
|
||||
|
|
@ -583,7 +593,7 @@ class GenerationJobManagerClass {
|
|||
|
||||
// Cleanup runtime state for deleted jobs
|
||||
for (const streamId of this.runtimeState.keys()) {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
if (!(await this.jobStore.hasJob(streamId))) {
|
||||
this.runtimeState.delete(streamId);
|
||||
this.contentState.clearContentState(streamId);
|
||||
this.eventTransport.cleanup(streamId);
|
||||
|
|
@ -598,13 +608,13 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Get stream info for status endpoint.
|
||||
*/
|
||||
getStreamInfo(streamId: string): {
|
||||
async getStreamInfo(streamId: string): Promise<{
|
||||
active: boolean;
|
||||
status: t.GenerationJobStatus;
|
||||
aggregatedContent?: Agents.MessageContentComplex[];
|
||||
createdAt: number;
|
||||
} | null {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
} | null> {
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
if (!jobData) {
|
||||
return null;
|
||||
}
|
||||
|
|
@ -620,27 +630,33 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Get total job count.
|
||||
*/
|
||||
getJobCount(): number {
|
||||
async getJobCount(): Promise<number> {
|
||||
return this.jobStore.getJobCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job count by status.
|
||||
*/
|
||||
getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
|
||||
return this.jobStore.getJobCountByStatus() as Record<t.GenerationJobStatus, number>;
|
||||
async getJobCountByStatus(): Promise<Record<t.GenerationJobStatus, number>> {
|
||||
const [running, complete, error, aborted] = await Promise.all([
|
||||
this.jobStore.getJobCountByStatus('running'),
|
||||
this.jobStore.getJobCountByStatus('complete'),
|
||||
this.jobStore.getJobCountByStatus('error'),
|
||||
this.jobStore.getJobCountByStatus('aborted'),
|
||||
]);
|
||||
return { running, complete, error, aborted };
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy the manager.
|
||||
*/
|
||||
destroy(): void {
|
||||
async destroy(): Promise<void> {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
this.cleanupInterval = null;
|
||||
}
|
||||
|
||||
this.jobStore.destroy();
|
||||
await this.jobStore.destroy();
|
||||
this.eventTransport.destroy();
|
||||
this.contentState.destroy();
|
||||
this.runtimeState.clear();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue