mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-18 17:30:16 +01:00
refactor: Enhance GenerationJobManager with In-Memory Implementations
- Introduced InMemoryJobStore, InMemoryEventTransport, and InMemoryContentState for improved job management and event handling. - Updated GenerationJobManager to utilize these new implementations, allowing for better separation of concerns and easier maintenance. - Enhanced job metadata handling to support user messages and response IDs for resumable functionality. - Improved cleanup and state management processes to prevent memory leaks and ensure efficient resource usage.
This commit is contained in:
parent
371cd8a557
commit
c7034f6d4a
14 changed files with 892 additions and 321 deletions
|
|
@ -1,21 +1,46 @@
|
|||
import { EventEmitter } from 'events';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
import type { StandardGraph } from '@librechat/agents';
|
||||
import type { SerializableJobData } from './interfaces/IJobStore';
|
||||
import type * as t from '~/types';
|
||||
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
||||
import { InMemoryContentState } from './implementations/InMemoryContentState';
|
||||
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
|
||||
|
||||
/**
|
||||
* Runtime state for active jobs - not serializable, kept in-memory per instance.
|
||||
* Contains AbortController, ready promise, and other non-serializable state.
|
||||
*/
|
||||
interface RuntimeJobState {
|
||||
abortController: AbortController;
|
||||
readyPromise: Promise<void>;
|
||||
resolveReady: () => void;
|
||||
finalEvent?: t.ServerSentEvent;
|
||||
syncSent: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Manages generation jobs for resumable LLM streams.
|
||||
* Generation runs independently of HTTP connections via EventEmitter.
|
||||
* Clients can subscribe/unsubscribe to job events without affecting generation.
|
||||
* Composes three implementations for clean separation of concerns:
|
||||
* - InMemoryJobStore: Serializable job metadata (swappable for Redis)
|
||||
* - InMemoryEventTransport: Pub/sub events (swappable for Redis Pub/Sub)
|
||||
* - InMemoryContentState: Volatile content refs with WeakRef (always in-memory)
|
||||
*/
|
||||
class GenerationJobManagerClass {
|
||||
private jobs = new Map<string, t.GenerationJob>();
|
||||
private jobStore: InMemoryJobStore;
|
||||
private eventTransport: InMemoryEventTransport;
|
||||
private contentState: InMemoryContentState;
|
||||
|
||||
/** Runtime state - always in-memory, not serializable */
|
||||
private runtimeState = new Map<string, RuntimeJobState>();
|
||||
|
||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||
/** Time to keep completed jobs before cleanup (1 hour) */
|
||||
private ttlAfterComplete = 3600000;
|
||||
/** Maximum number of concurrent jobs */
|
||||
private maxJobs = 1000;
|
||||
|
||||
constructor() {
|
||||
this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 });
|
||||
this.eventTransport = new InMemoryEventTransport();
|
||||
this.contentState = new InMemoryContentState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the job manager with periodic cleanup.
|
||||
|
|
@ -25,6 +50,8 @@ class GenerationJobManagerClass {
|
|||
return;
|
||||
}
|
||||
|
||||
this.jobStore.initialize();
|
||||
|
||||
this.cleanupInterval = setInterval(() => {
|
||||
this.cleanup();
|
||||
}, 60000);
|
||||
|
|
@ -33,185 +60,231 @@ class GenerationJobManagerClass {
|
|||
this.cleanupInterval.unref();
|
||||
}
|
||||
|
||||
logger.debug('[GenerationJobManager] Initialized with cleanup interval');
|
||||
logger.debug('[GenerationJobManager] Initialized');
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new generation job.
|
||||
* @param streamId - Unique identifier for the stream
|
||||
* @param userId - User ID who initiated the generation
|
||||
* @param conversationId - Optional conversation ID
|
||||
* @returns The created job
|
||||
* @returns A facade object compatible with the old GenerationJob interface
|
||||
*/
|
||||
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
|
||||
if (this.jobs.size >= this.maxJobs) {
|
||||
this.evictOldest();
|
||||
}
|
||||
// Create serializable job data (sync for in-memory)
|
||||
const jobData = this.jobStore.createJobSync(streamId, userId, conversationId);
|
||||
|
||||
// Create runtime state
|
||||
let resolveReady: () => void;
|
||||
const readyPromise = new Promise<void>((resolve) => {
|
||||
resolveReady = resolve;
|
||||
});
|
||||
|
||||
const job: t.GenerationJob = {
|
||||
streamId,
|
||||
emitter: new EventEmitter(),
|
||||
status: 'running',
|
||||
createdAt: Date.now(),
|
||||
const runtime: RuntimeJobState = {
|
||||
abortController: new AbortController(),
|
||||
metadata: { userId, conversationId },
|
||||
readyPromise,
|
||||
resolveReady: resolveReady!,
|
||||
chunks: [],
|
||||
syncSent: false,
|
||||
};
|
||||
this.runtimeState.set(streamId, runtime);
|
||||
|
||||
job.emitter.setMaxListeners(100);
|
||||
// Set up all-subscribers-left callback
|
||||
this.eventTransport.onAllSubscribersLeft(streamId, () => {
|
||||
const currentRuntime = this.runtimeState.get(streamId);
|
||||
if (currentRuntime) {
|
||||
currentRuntime.syncSent = false;
|
||||
}
|
||||
const content = this.contentState.getContentParts(streamId) ?? [];
|
||||
this.eventTransport.emitChunk(streamId, {
|
||||
_internal: 'allSubscribersLeft',
|
||||
content,
|
||||
});
|
||||
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
|
||||
});
|
||||
|
||||
this.jobs.set(streamId, job);
|
||||
logger.debug(`[GenerationJobManager] Created job: ${streamId}`);
|
||||
|
||||
return job;
|
||||
// Return facade for backwards compatibility
|
||||
return this.buildJobFacade(streamId, jobData, runtime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a GenerationJob facade from job data and runtime state.
|
||||
* This maintains backwards compatibility with existing code.
|
||||
*/
|
||||
private buildJobFacade(
|
||||
streamId: string,
|
||||
jobData: SerializableJobData,
|
||||
runtime: RuntimeJobState,
|
||||
): t.GenerationJob {
|
||||
// Create a proxy emitter that delegates to eventTransport
|
||||
const emitterProxy = {
|
||||
on: (event: string, handler: (...args: unknown[]) => void) => {
|
||||
if (event === 'allSubscribersLeft') {
|
||||
// Subscribe to internal event
|
||||
this.eventTransport.subscribe(streamId, {
|
||||
onChunk: (e) => {
|
||||
const evt = e as Record<string, unknown>;
|
||||
if (evt._internal === 'allSubscribersLeft') {
|
||||
handler(evt.content);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
},
|
||||
emit: () => {
|
||||
/* handled via eventTransport */
|
||||
},
|
||||
listenerCount: () => this.eventTransport.getSubscriberCount(streamId),
|
||||
setMaxListeners: () => {
|
||||
/* no-op for proxy */
|
||||
},
|
||||
removeAllListeners: () => this.eventTransport.cleanup(streamId),
|
||||
off: () => {
|
||||
/* handled via unsubscribe */
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
streamId,
|
||||
emitter: emitterProxy as unknown as t.GenerationJob['emitter'],
|
||||
status: jobData.status as t.GenerationJobStatus,
|
||||
createdAt: jobData.createdAt,
|
||||
completedAt: jobData.completedAt,
|
||||
abortController: runtime.abortController,
|
||||
error: jobData.error,
|
||||
metadata: {
|
||||
userId: jobData.userId,
|
||||
conversationId: jobData.conversationId,
|
||||
userMessage: jobData.userMessage,
|
||||
responseMessageId: jobData.responseMessageId,
|
||||
sender: jobData.sender,
|
||||
},
|
||||
readyPromise: runtime.readyPromise,
|
||||
resolveReady: runtime.resolveReady,
|
||||
finalEvent: runtime.finalEvent,
|
||||
syncSent: runtime.syncSent,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a job by streamId.
|
||||
* @param streamId - The stream identifier
|
||||
* @returns The job if found, undefined otherwise
|
||||
*/
|
||||
getJob(streamId: string): t.GenerationJob | undefined {
|
||||
return this.jobs.get(streamId);
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (!jobData || !runtime) {
|
||||
return undefined;
|
||||
}
|
||||
return this.buildJobFacade(streamId, jobData, runtime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find an active job by conversationId.
|
||||
* Since streamId === conversationId for existing conversations,
|
||||
* we first check by streamId, then search metadata.
|
||||
* @param conversationId - The conversation identifier
|
||||
* @returns The job if found, undefined otherwise
|
||||
*/
|
||||
getJobByConversation(conversationId: string): t.GenerationJob | undefined {
|
||||
const directMatch = this.jobs.get(conversationId);
|
||||
if (directMatch && directMatch.status === 'running') {
|
||||
return directMatch;
|
||||
const jobData = this.jobStore.getJobByConversationSync(conversationId);
|
||||
if (!jobData) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.metadata.conversationId === conversationId && job.status === 'running') {
|
||||
return job;
|
||||
}
|
||||
const runtime = this.runtimeState.get(jobData.streamId);
|
||||
if (!runtime) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
return this.buildJobFacade(jobData.streamId, jobData, runtime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a job exists.
|
||||
* @param streamId - The stream identifier
|
||||
* @returns True if job exists
|
||||
*/
|
||||
hasJob(streamId: string): boolean {
|
||||
return this.jobs.has(streamId);
|
||||
return this.jobStore.hasJobSync(streamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job status.
|
||||
* @param streamId - The stream identifier
|
||||
* @returns The job status or undefined if not found
|
||||
*/
|
||||
getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
|
||||
return this.jobs.get(streamId)?.status;
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
return jobData?.status as t.GenerationJobStatus | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark job as complete.
|
||||
* @param streamId - The stream identifier
|
||||
* @param error - Optional error message if job failed
|
||||
*/
|
||||
completeJob(streamId: string, error?: string): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
async completeJob(streamId: string, error?: string): Promise<void> {
|
||||
await this.jobStore.updateJob(streamId, {
|
||||
status: error ? 'error' : 'complete',
|
||||
completedAt: Date.now(),
|
||||
error,
|
||||
});
|
||||
|
||||
job.status = error ? 'error' : 'complete';
|
||||
job.completedAt = Date.now();
|
||||
if (error) {
|
||||
job.error = error;
|
||||
}
|
||||
// Clear content state
|
||||
this.contentState.clearContentState(streamId);
|
||||
|
||||
logger.debug(`[GenerationJobManager] Job completed: ${streamId}, status: ${job.status}`);
|
||||
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a job (user-initiated).
|
||||
* Emits both error event and a final done event with aborted flag.
|
||||
* @param streamId - The stream identifier
|
||||
*/
|
||||
abortJob(streamId: string): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
async abortJob(streamId: string): Promise<void> {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
|
||||
if (!jobData) {
|
||||
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`[GenerationJobManager] Aborting job ${streamId}, signal already aborted: ${job.abortController.signal.aborted}`,
|
||||
);
|
||||
job.abortController.abort();
|
||||
job.status = 'aborted';
|
||||
job.completedAt = Date.now();
|
||||
logger.debug(
|
||||
`[GenerationJobManager] AbortController.abort() called for ${streamId}, signal.aborted: ${job.abortController.signal.aborted}`,
|
||||
);
|
||||
if (runtime) {
|
||||
runtime.abortController.abort();
|
||||
}
|
||||
|
||||
// Create a final event for abort so clients can properly handle UI cleanup
|
||||
const userMessageId = job.metadata.userMessage?.messageId;
|
||||
const abortFinalEvent = {
|
||||
await this.jobStore.updateJob(streamId, {
|
||||
status: 'aborted',
|
||||
completedAt: Date.now(),
|
||||
});
|
||||
|
||||
// Create final event for abort
|
||||
const userMessageId = jobData.userMessage?.messageId;
|
||||
const content = this.contentState.getContentParts(streamId) ?? [];
|
||||
|
||||
const abortFinalEvent: t.ServerSentEvent = {
|
||||
final: true,
|
||||
conversation: {
|
||||
conversationId: job.metadata.conversationId,
|
||||
},
|
||||
conversation: { conversationId: jobData.conversationId },
|
||||
title: 'New Chat',
|
||||
requestMessage: job.metadata.userMessage
|
||||
requestMessage: jobData.userMessage
|
||||
? {
|
||||
messageId: userMessageId,
|
||||
parentMessageId: job.metadata.userMessage.parentMessageId,
|
||||
conversationId: job.metadata.conversationId,
|
||||
text: job.metadata.userMessage.text ?? '',
|
||||
parentMessageId: jobData.userMessage.parentMessageId,
|
||||
conversationId: jobData.conversationId,
|
||||
text: jobData.userMessage.text ?? '',
|
||||
isCreatedByUser: true,
|
||||
}
|
||||
: null,
|
||||
responseMessage: {
|
||||
messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
|
||||
parentMessageId: userMessageId, // Link response to user message
|
||||
conversationId: job.metadata.conversationId,
|
||||
content: job.contentPartsRef ?? [],
|
||||
sender: job.metadata.sender ?? 'AI',
|
||||
messageId: jobData.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
|
||||
parentMessageId: userMessageId,
|
||||
conversationId: jobData.conversationId,
|
||||
content,
|
||||
sender: jobData.sender ?? 'AI',
|
||||
unfinished: true,
|
||||
/** Not an error - the job was intentionally aborted */
|
||||
error: false,
|
||||
isCreatedByUser: false,
|
||||
},
|
||||
aborted: true,
|
||||
} as unknown as t.ServerSentEvent;
|
||||
|
||||
job.finalEvent = abortFinalEvent;
|
||||
job.emitter.emit('done', abortFinalEvent);
|
||||
// Don't emit error event - it causes unhandled error warnings
|
||||
// The done event with error:true and aborted:true is sufficient
|
||||
if (runtime) {
|
||||
runtime.finalEvent = abortFinalEvent;
|
||||
}
|
||||
|
||||
this.eventTransport.emitDone(streamId, abortFinalEvent);
|
||||
this.contentState.clearContentState(streamId);
|
||||
|
||||
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a job's event stream with replay support.
|
||||
* Replays any chunks buffered during disconnect, then continues with live events.
|
||||
* Buffer is cleared after replay (only holds chunks missed during disconnect).
|
||||
* @param streamId - The stream identifier
|
||||
* @param onChunk - Handler for chunk events
|
||||
* @param onDone - Optional handler for completion
|
||||
* @param onError - Optional handler for errors
|
||||
* @returns Object with unsubscribe function, or null if job not found
|
||||
* Subscribe to a job's event stream.
|
||||
*/
|
||||
subscribe(
|
||||
streamId: string,
|
||||
|
|
@ -219,352 +292,263 @@ class GenerationJobManagerClass {
|
|||
onDone?: t.DoneHandler,
|
||||
onError?: t.ErrorHandler,
|
||||
): { unsubscribe: t.UnsubscribeFn } | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (!runtime) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Use setImmediate to allow the caller to set up their connection first
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
|
||||
// If job already complete, send final event
|
||||
setImmediate(() => {
|
||||
// If job is already complete, send the final event
|
||||
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
|
||||
onDone?.(job.finalEvent);
|
||||
if (
|
||||
runtime.finalEvent &&
|
||||
jobData &&
|
||||
['complete', 'error', 'aborted'].includes(jobData.status)
|
||||
) {
|
||||
onDone?.(runtime.finalEvent);
|
||||
}
|
||||
});
|
||||
|
||||
const chunkHandler = (event: t.ServerSentEvent) => onChunk(event);
|
||||
const doneHandler = (event: t.ServerSentEvent) => onDone?.(event);
|
||||
const errorHandler = (error: string) => onError?.(error);
|
||||
const subscription = this.eventTransport.subscribe(streamId, {
|
||||
onChunk: (event) => {
|
||||
const e = event as t.ServerSentEvent;
|
||||
// Filter out internal events
|
||||
if (!(e as Record<string, unknown>)._internal) {
|
||||
onChunk(e);
|
||||
}
|
||||
},
|
||||
onDone: (event) => onDone?.(event as t.ServerSentEvent),
|
||||
onError,
|
||||
});
|
||||
|
||||
job.emitter.on('chunk', chunkHandler);
|
||||
job.emitter.on('done', doneHandler);
|
||||
job.emitter.on('error', errorHandler);
|
||||
|
||||
// Signal that we're ready to receive events (first subscriber)
|
||||
if (job.emitter.listenerCount('chunk') === 1) {
|
||||
job.resolveReady();
|
||||
// Signal ready on first subscriber
|
||||
if (this.eventTransport.isFirstSubscriber(streamId)) {
|
||||
runtime.resolveReady();
|
||||
logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`);
|
||||
}
|
||||
|
||||
const unsubscribe = () => {
|
||||
const currentJob = this.jobs.get(streamId);
|
||||
if (currentJob) {
|
||||
currentJob.emitter.off('chunk', chunkHandler);
|
||||
currentJob.emitter.off('done', doneHandler);
|
||||
currentJob.emitter.off('error', errorHandler);
|
||||
|
||||
// When last subscriber leaves
|
||||
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
|
||||
// Reset syncSent so reconnecting clients get sync event again
|
||||
currentJob.syncSent = false;
|
||||
// Emit event for saving partial response - use graph's contentParts directly
|
||||
currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []);
|
||||
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return { unsubscribe };
|
||||
return subscription;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a chunk event to all subscribers.
|
||||
* Only buffers chunks when no subscribers are listening (for reconnect replay).
|
||||
* Also tracks run steps and user message for reconnection state.
|
||||
* @param streamId - The stream identifier
|
||||
* @param event - The event data to emit
|
||||
*/
|
||||
emitChunk(streamId: string, event: t.ServerSentEvent): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job || job.status !== 'running') {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
if (!jobData || jobData.status !== 'running') {
|
||||
return;
|
||||
}
|
||||
|
||||
// // Only buffer if no one is listening (for reconnect replay)
|
||||
// const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
|
||||
// if (!hasSubscribers) {
|
||||
// job.chunks.push(event);
|
||||
// }
|
||||
|
||||
// Track user message from created event
|
||||
this.trackUserMessage(job, event);
|
||||
this.trackUserMessage(streamId, event);
|
||||
|
||||
// Run steps and content are tracked via graphRef and contentPartsRef
|
||||
// No need to aggregate separately - these reference the graph's data directly
|
||||
|
||||
job.emitter.emit('chunk', event);
|
||||
this.eventTransport.emitChunk(streamId, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Track user message from created event for reconnection.
|
||||
* Track user message from created event.
|
||||
*/
|
||||
private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void {
|
||||
private trackUserMessage(streamId: string, event: t.ServerSentEvent): void {
|
||||
const data = event as Record<string, unknown>;
|
||||
if (!data.created || !data.message) {
|
||||
return;
|
||||
}
|
||||
|
||||
const message = data.message as Record<string, unknown>;
|
||||
job.metadata.userMessage = {
|
||||
messageId: message.messageId as string,
|
||||
parentMessageId: message.parentMessageId as string | undefined,
|
||||
conversationId: message.conversationId as string | undefined,
|
||||
text: message.text as string | undefined,
|
||||
const updates: Partial<SerializableJobData> = {
|
||||
userMessage: {
|
||||
messageId: message.messageId as string,
|
||||
parentMessageId: message.parentMessageId as string | undefined,
|
||||
conversationId: message.conversationId as string | undefined,
|
||||
text: message.text as string | undefined,
|
||||
},
|
||||
};
|
||||
|
||||
// Update conversationId in metadata if not set
|
||||
if (!job.metadata.conversationId && message.conversationId) {
|
||||
job.metadata.conversationId = message.conversationId as string;
|
||||
if (message.conversationId) {
|
||||
updates.conversationId = message.conversationId as string;
|
||||
}
|
||||
|
||||
logger.debug(`[GenerationJobManager] Tracked user message for ${job.streamId}`);
|
||||
this.jobStore.updateJob(streamId, updates);
|
||||
logger.debug(`[GenerationJobManager] Tracked user message for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update job metadata with additional information.
|
||||
* Called when more information becomes available during generation.
|
||||
* @param streamId - The stream identifier
|
||||
* @param metadata - Partial metadata to merge
|
||||
* Update job metadata.
|
||||
*/
|
||||
updateMetadata(streamId: string, metadata: Partial<t.GenerationJobMetadata>): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
const updates: Partial<SerializableJobData> = {};
|
||||
if (metadata.responseMessageId) {
|
||||
updates.responseMessageId = metadata.responseMessageId;
|
||||
}
|
||||
job.metadata = { ...job.metadata, ...metadata };
|
||||
if (metadata.sender) {
|
||||
updates.sender = metadata.sender;
|
||||
}
|
||||
if (metadata.conversationId) {
|
||||
updates.conversationId = metadata.conversationId;
|
||||
}
|
||||
if (metadata.userMessage) {
|
||||
updates.userMessage = metadata.userMessage;
|
||||
}
|
||||
this.jobStore.updateJob(streamId, updates);
|
||||
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set reference to the graph's contentParts array.
|
||||
* This is the authoritative content source - no need to aggregate separately.
|
||||
* @param streamId - The stream identifier
|
||||
* @param contentParts - Reference to graph's contentParts array
|
||||
*/
|
||||
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
return;
|
||||
}
|
||||
job.contentPartsRef = contentParts;
|
||||
logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, {
|
||||
initialLength: contentParts?.length ?? 0,
|
||||
isArray: Array.isArray(contentParts),
|
||||
});
|
||||
this.contentState.setContentParts(streamId, contentParts);
|
||||
logger.debug(`[GenerationJobManager] Set contentParts for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set reference to the graph instance.
|
||||
* This provides access to run steps (contentData) - no need to track separately.
|
||||
* @param streamId - The stream identifier
|
||||
* @param graph - Reference to the graph instance (must have contentData property)
|
||||
*/
|
||||
setGraph(streamId: string, graph: StandardGraph): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
return;
|
||||
}
|
||||
job.graphRef = graph;
|
||||
this.contentState.setGraph(streamId, graph);
|
||||
logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get resume state for reconnecting clients.
|
||||
* Includes run steps, aggregated content, and user message data.
|
||||
* @param streamId - The stream identifier
|
||||
* @returns Resume state or null if job not found
|
||||
*/
|
||||
getResumeState(streamId: string): t.ResumeState | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
if (!jobData) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Use graph's contentParts directly - it's always current and complete
|
||||
// No conversion needed - send as-is
|
||||
const aggregatedContent = job.contentPartsRef ?? [];
|
||||
|
||||
// Use graph's contentData for run steps - it's the authoritative source
|
||||
const runSteps = job.graphRef?.contentData ?? [];
|
||||
const aggregatedContent = this.contentState.getContentParts(streamId) ?? [];
|
||||
const runSteps = this.contentState.getRunSteps(streamId);
|
||||
|
||||
logger.debug(`[GenerationJobManager] getResumeState:`, {
|
||||
streamId,
|
||||
aggregatedContentLength: aggregatedContent.length,
|
||||
runStepsLength: runSteps.length,
|
||||
hasGraphRef: !!job.graphRef,
|
||||
hasContentPartsRef: !!job.contentPartsRef,
|
||||
});
|
||||
|
||||
return {
|
||||
runSteps,
|
||||
aggregatedContent,
|
||||
userMessage: job.metadata.userMessage,
|
||||
responseMessageId: job.metadata.responseMessageId,
|
||||
conversationId: job.metadata.conversationId,
|
||||
userMessage: jobData.userMessage,
|
||||
responseMessageId: jobData.responseMessageId,
|
||||
conversationId: jobData.conversationId,
|
||||
sender: jobData.sender,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark that sync has been sent for this job to prevent duplicate replays.
|
||||
* @param streamId - The stream identifier
|
||||
* Mark that sync has been sent.
|
||||
*/
|
||||
markSyncSent(streamId: string): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (job) {
|
||||
job.syncSent = true;
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (runtime) {
|
||||
runtime.syncSent = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if sync has been sent for this job.
|
||||
* @param streamId - The stream identifier
|
||||
* Check if sync has been sent.
|
||||
*/
|
||||
wasSyncSent(streamId: string): boolean {
|
||||
return this.jobs.get(streamId)?.syncSent ?? false;
|
||||
return this.runtimeState.get(streamId)?.syncSent ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a done event to all subscribers.
|
||||
* Stores the final event for replay on reconnect.
|
||||
* @param streamId - The stream identifier
|
||||
* @param event - The final event data
|
||||
* Emit a done event.
|
||||
*/
|
||||
emitDone(streamId: string, event: t.ServerSentEvent): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
const runtime = this.runtimeState.get(streamId);
|
||||
if (runtime) {
|
||||
runtime.finalEvent = event;
|
||||
}
|
||||
job.finalEvent = event;
|
||||
job.emitter.emit('done', event);
|
||||
this.eventTransport.emitDone(streamId, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit an error event to all subscribers.
|
||||
* @param streamId - The stream identifier
|
||||
* @param error - The error message
|
||||
* Emit an error event.
|
||||
*/
|
||||
emitError(streamId: string, error: string): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
job.emitter.emit('error', error);
|
||||
this.eventTransport.emitError(streamId, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup completed jobs after TTL.
|
||||
* Cleanup expired jobs.
|
||||
*/
|
||||
private cleanup(): void {
|
||||
const now = Date.now();
|
||||
const toDelete: string[] = [];
|
||||
private async cleanup(): Promise<void> {
|
||||
const count = await this.jobStore.cleanup();
|
||||
|
||||
for (const [streamId, job] of this.jobs) {
|
||||
const isFinished = ['complete', 'error', 'aborted'].includes(job.status);
|
||||
if (isFinished && job.completedAt && now - job.completedAt > this.ttlAfterComplete) {
|
||||
toDelete.push(streamId);
|
||||
// Cleanup runtime state for deleted jobs
|
||||
for (const streamId of this.runtimeState.keys()) {
|
||||
if (!this.jobStore.hasJobSync(streamId)) {
|
||||
this.runtimeState.delete(streamId);
|
||||
this.contentState.clearContentState(streamId);
|
||||
this.eventTransport.cleanup(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
toDelete.forEach((id) => this.deleteJob(id));
|
||||
|
||||
if (toDelete.length > 0) {
|
||||
logger.debug(`[GenerationJobManager] Cleaned up ${toDelete.length} expired jobs`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a job and cleanup listeners.
|
||||
* @param streamId - The stream identifier
|
||||
*/
|
||||
private deleteJob(streamId: string): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (job) {
|
||||
job.emitter.removeAllListeners();
|
||||
this.jobs.delete(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict oldest job (LRU).
|
||||
*/
|
||||
private evictOldest(): void {
|
||||
let oldestId: string | null = null;
|
||||
let oldestTime = Infinity;
|
||||
|
||||
for (const [streamId, job] of this.jobs) {
|
||||
if (job.createdAt < oldestTime) {
|
||||
oldestTime = job.createdAt;
|
||||
oldestId = streamId;
|
||||
}
|
||||
}
|
||||
|
||||
if (oldestId) {
|
||||
logger.warn(`[GenerationJobManager] Evicting oldest job: ${oldestId}`);
|
||||
this.deleteJob(oldestId);
|
||||
if (count > 0) {
|
||||
logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get stream info for status endpoint.
|
||||
* Returns chunk count, status, aggregated content, and run step count.
|
||||
*/
|
||||
getStreamInfo(streamId: string): {
|
||||
active: boolean;
|
||||
status: t.GenerationJobStatus;
|
||||
chunkCount: number;
|
||||
runStepCount: number;
|
||||
aggregatedContent?: Agents.MessageContentComplex[];
|
||||
createdAt: number;
|
||||
} | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
const jobData = this.jobStore.getJobSync(streamId);
|
||||
if (!jobData) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
active: job.status === 'running',
|
||||
status: job.status,
|
||||
chunkCount: job.chunks.length,
|
||||
runStepCount: job.graphRef?.contentData?.length ?? 0,
|
||||
aggregatedContent: job.contentPartsRef ?? [],
|
||||
createdAt: job.createdAt,
|
||||
active: jobData.status === 'running',
|
||||
status: jobData.status as t.GenerationJobStatus,
|
||||
aggregatedContent: this.contentState.getContentParts(streamId) ?? [],
|
||||
createdAt: jobData.createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total number of active jobs.
|
||||
* Get total job count.
|
||||
*/
|
||||
getJobCount(): number {
|
||||
return this.jobs.size;
|
||||
return this.jobStore.getJobCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of jobs by status.
|
||||
* Get job count by status.
|
||||
*/
|
||||
getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
|
||||
const counts: Record<t.GenerationJobStatus, number> = {
|
||||
running: 0,
|
||||
complete: 0,
|
||||
error: 0,
|
||||
aborted: 0,
|
||||
};
|
||||
|
||||
for (const job of this.jobs.values()) {
|
||||
counts[job.status]++;
|
||||
}
|
||||
|
||||
return counts;
|
||||
return this.jobStore.getJobCountByStatus() as Record<t.GenerationJobStatus, number>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy the manager and cleanup all jobs.
|
||||
* Destroy the manager.
|
||||
*/
|
||||
destroy(): void {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
this.cleanupInterval = null;
|
||||
}
|
||||
this.jobs.forEach((_, streamId) => this.deleteJob(streamId));
|
||||
|
||||
this.jobStore.destroy();
|
||||
this.eventTransport.destroy();
|
||||
this.contentState.destroy();
|
||||
this.runtimeState.clear();
|
||||
|
||||
logger.debug('[GenerationJobManager] Destroyed');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue