mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-19 01:40:15 +01:00
fix: improve syncing when switching conversations
This commit is contained in:
parent
c8975aa925
commit
aa25e300a6
14 changed files with 314 additions and 176 deletions
|
|
@ -1,18 +1,8 @@
|
|||
import { EventEmitter } from 'events';
|
||||
import { logger } from '@librechat/data-schemas';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
import type { ServerSentEvent } from '~/types';
|
||||
import type {
|
||||
GenerationJob,
|
||||
GenerationJobStatus,
|
||||
ChunkHandler,
|
||||
DoneHandler,
|
||||
ErrorHandler,
|
||||
UnsubscribeFn,
|
||||
ContentPart,
|
||||
ResumeState,
|
||||
GenerationJobMetadata,
|
||||
} from './types';
|
||||
import type { StandardGraph } from '@librechat/agents';
|
||||
import type * as t from '~/types';
|
||||
|
||||
/**
|
||||
* Manages generation jobs for resumable LLM streams.
|
||||
|
|
@ -20,7 +10,7 @@ import type {
|
|||
* Clients can subscribe/unsubscribe to job events without affecting generation.
|
||||
*/
|
||||
class GenerationJobManagerClass {
|
||||
private jobs = new Map<string, GenerationJob>();
|
||||
private jobs = new Map<string, t.GenerationJob>();
|
||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||
/** Time to keep completed jobs before cleanup (1 hour) */
|
||||
private ttlAfterComplete = 3600000;
|
||||
|
|
@ -53,7 +43,7 @@ class GenerationJobManagerClass {
|
|||
* @param conversationId - Optional conversation ID
|
||||
* @returns The created job
|
||||
*/
|
||||
createJob(streamId: string, userId: string, conversationId?: string): GenerationJob {
|
||||
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
|
||||
if (this.jobs.size >= this.maxJobs) {
|
||||
this.evictOldest();
|
||||
}
|
||||
|
|
@ -63,7 +53,7 @@ class GenerationJobManagerClass {
|
|||
resolveReady = resolve;
|
||||
});
|
||||
|
||||
const job: GenerationJob = {
|
||||
const job: t.GenerationJob = {
|
||||
streamId,
|
||||
emitter: new EventEmitter(),
|
||||
status: 'running',
|
||||
|
|
@ -73,8 +63,6 @@ class GenerationJobManagerClass {
|
|||
readyPromise,
|
||||
resolveReady: resolveReady!,
|
||||
chunks: [],
|
||||
aggregatedContent: [],
|
||||
runSteps: new Map(),
|
||||
};
|
||||
|
||||
job.emitter.setMaxListeners(100);
|
||||
|
|
@ -90,7 +78,7 @@ class GenerationJobManagerClass {
|
|||
* @param streamId - The stream identifier
|
||||
* @returns The job if found, undefined otherwise
|
||||
*/
|
||||
getJob(streamId: string): GenerationJob | undefined {
|
||||
getJob(streamId: string): t.GenerationJob | undefined {
|
||||
return this.jobs.get(streamId);
|
||||
}
|
||||
|
||||
|
|
@ -101,7 +89,7 @@ class GenerationJobManagerClass {
|
|||
* @param conversationId - The conversation identifier
|
||||
* @returns The job if found, undefined otherwise
|
||||
*/
|
||||
getJobByConversation(conversationId: string): GenerationJob | undefined {
|
||||
getJobByConversation(conversationId: string): t.GenerationJob | undefined {
|
||||
const directMatch = this.jobs.get(conversationId);
|
||||
if (directMatch && directMatch.status === 'running') {
|
||||
return directMatch;
|
||||
|
|
@ -130,7 +118,7 @@ class GenerationJobManagerClass {
|
|||
* @param streamId - The stream identifier
|
||||
* @returns The job status or undefined if not found
|
||||
*/
|
||||
getJobStatus(streamId: string): GenerationJobStatus | undefined {
|
||||
getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
|
||||
return this.jobs.get(streamId)?.status;
|
||||
}
|
||||
|
||||
|
|
@ -197,7 +185,7 @@ class GenerationJobManagerClass {
|
|||
messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
|
||||
parentMessageId: userMessageId, // Link response to user message
|
||||
conversationId: job.metadata.conversationId,
|
||||
content: job.aggregatedContent ?? [],
|
||||
content: job.contentPartsRef ?? [],
|
||||
sender: job.metadata.sender ?? 'AI',
|
||||
unfinished: true,
|
||||
/** Not an error - the job was intentionally aborted */
|
||||
|
|
@ -205,7 +193,7 @@ class GenerationJobManagerClass {
|
|||
isCreatedByUser: false,
|
||||
},
|
||||
aborted: true,
|
||||
} as unknown as ServerSentEvent;
|
||||
} as unknown as t.ServerSentEvent;
|
||||
|
||||
job.finalEvent = abortFinalEvent;
|
||||
job.emitter.emit('done', abortFinalEvent);
|
||||
|
|
@ -227,42 +215,25 @@ class GenerationJobManagerClass {
|
|||
*/
|
||||
subscribe(
|
||||
streamId: string,
|
||||
onChunk: ChunkHandler,
|
||||
onDone?: DoneHandler,
|
||||
onError?: ErrorHandler,
|
||||
): { unsubscribe: UnsubscribeFn } | null {
|
||||
onChunk: t.ChunkHandler,
|
||||
onDone?: t.DoneHandler,
|
||||
onError?: t.ErrorHandler,
|
||||
): { unsubscribe: t.UnsubscribeFn } | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Replay buffered chunks (only chunks missed during disconnect)
|
||||
const chunksToReplay = [...job.chunks];
|
||||
const replayCount = chunksToReplay.length;
|
||||
|
||||
if (replayCount > 0) {
|
||||
logger.debug(
|
||||
`[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Clear buffer after capturing for replay - subscriber is now connected
|
||||
job.chunks = [];
|
||||
|
||||
// Use setImmediate to allow the caller to set up their connection first
|
||||
setImmediate(() => {
|
||||
for (const chunk of chunksToReplay) {
|
||||
onChunk(chunk);
|
||||
}
|
||||
|
||||
// If job is already complete, send the final event
|
||||
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
|
||||
onDone?.(job.finalEvent);
|
||||
}
|
||||
});
|
||||
|
||||
const chunkHandler = (event: ServerSentEvent) => onChunk(event);
|
||||
const doneHandler = (event: ServerSentEvent) => onDone?.(event);
|
||||
const chunkHandler = (event: t.ServerSentEvent) => onChunk(event);
|
||||
const doneHandler = (event: t.ServerSentEvent) => onDone?.(event);
|
||||
const errorHandler = (error: string) => onError?.(error);
|
||||
|
||||
job.emitter.on('chunk', chunkHandler);
|
||||
|
|
@ -282,10 +253,13 @@ class GenerationJobManagerClass {
|
|||
currentJob.emitter.off('done', doneHandler);
|
||||
currentJob.emitter.off('error', errorHandler);
|
||||
|
||||
// Emit event when last subscriber leaves (for saving partial response)
|
||||
// When last subscriber leaves
|
||||
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
|
||||
currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent);
|
||||
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`);
|
||||
// Reset syncSent so reconnecting clients get sync event again
|
||||
currentJob.syncSent = false;
|
||||
// Emit event for saving partial response - use graph's contentParts directly
|
||||
currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []);
|
||||
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -300,53 +274,31 @@ class GenerationJobManagerClass {
|
|||
* @param streamId - The stream identifier
|
||||
* @param event - The event data to emit
|
||||
*/
|
||||
emitChunk(streamId: string, event: ServerSentEvent): void {
|
||||
emitChunk(streamId: string, event: t.ServerSentEvent): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job || job.status !== 'running') {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only buffer if no one is listening (for reconnect replay)
|
||||
const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
|
||||
if (!hasSubscribers) {
|
||||
job.chunks.push(event);
|
||||
}
|
||||
|
||||
// Track run steps for reconnection
|
||||
this.trackRunStep(job, event);
|
||||
// // Only buffer if no one is listening (for reconnect replay)
|
||||
// const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
|
||||
// if (!hasSubscribers) {
|
||||
// job.chunks.push(event);
|
||||
// }
|
||||
|
||||
// Track user message from created event
|
||||
this.trackUserMessage(job, event);
|
||||
|
||||
// Always aggregate content (for partial response saving)
|
||||
this.aggregateContent(job, event);
|
||||
// Run steps and content are tracked via graphRef and contentPartsRef
|
||||
// No need to aggregate separately - these reference the graph's data directly
|
||||
|
||||
job.emitter.emit('chunk', event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Track run step events for reconnection state.
|
||||
* This allows reconnecting clients to rebuild their stepMap.
|
||||
*/
|
||||
private trackRunStep(job: GenerationJob, event: ServerSentEvent): void {
|
||||
const data = event as Record<string, unknown>;
|
||||
if (data.event !== 'on_run_step') {
|
||||
return;
|
||||
}
|
||||
|
||||
const runStep = data.data as Agents.RunStep;
|
||||
if (!runStep?.id) {
|
||||
return;
|
||||
}
|
||||
|
||||
job.runSteps.set(runStep.id, runStep);
|
||||
logger.debug(`[GenerationJobManager] Tracked run step: ${runStep.id} for ${job.streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Track user message from created event for reconnection.
|
||||
*/
|
||||
private trackUserMessage(job: GenerationJob, event: ServerSentEvent): void {
|
||||
private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void {
|
||||
const data = event as Record<string, unknown>;
|
||||
if (!data.created || !data.message) {
|
||||
return;
|
||||
|
|
@ -374,7 +326,7 @@ class GenerationJobManagerClass {
|
|||
* @param streamId - The stream identifier
|
||||
* @param metadata - Partial metadata to merge
|
||||
*/
|
||||
updateMetadata(streamId: string, metadata: Partial<GenerationJobMetadata>): void {
|
||||
updateMetadata(streamId: string, metadata: Partial<t.GenerationJobMetadata>): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
|
|
@ -383,21 +335,69 @@ class GenerationJobManagerClass {
|
|||
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set reference to the graph's contentParts array.
|
||||
* This is the authoritative content source - no need to aggregate separately.
|
||||
* @param streamId - The stream identifier
|
||||
* @param contentParts - Reference to graph's contentParts array
|
||||
*/
|
||||
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
job.contentPartsRef = contentParts;
|
||||
logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, {
|
||||
initialLength: contentParts?.length ?? 0,
|
||||
isArray: Array.isArray(contentParts),
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set reference to the graph instance.
|
||||
* This provides access to run steps (contentData) - no need to track separately.
|
||||
* @param streamId - The stream identifier
|
||||
* @param graph - Reference to the graph instance (must have contentData property)
|
||||
*/
|
||||
setGraph(streamId: string, graph: StandardGraph): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
}
|
||||
job.graphRef = graph;
|
||||
logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get resume state for reconnecting clients.
|
||||
* Includes run steps, aggregated content, and user message data.
|
||||
* @param streamId - The stream identifier
|
||||
* @returns Resume state or null if job not found
|
||||
*/
|
||||
getResumeState(streamId: string): ResumeState | null {
|
||||
getResumeState(streamId: string): t.ResumeState | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Use graph's contentParts directly - it's always current and complete
|
||||
// No conversion needed - send as-is
|
||||
const aggregatedContent = job.contentPartsRef ?? [];
|
||||
|
||||
// Use graph's contentData for run steps - it's the authoritative source
|
||||
const runSteps = job.graphRef?.contentData ?? [];
|
||||
|
||||
logger.debug(`[GenerationJobManager] getResumeState:`, {
|
||||
streamId,
|
||||
aggregatedContentLength: aggregatedContent.length,
|
||||
runStepsLength: runSteps.length,
|
||||
hasGraphRef: !!job.graphRef,
|
||||
hasContentPartsRef: !!job.contentPartsRef,
|
||||
});
|
||||
|
||||
return {
|
||||
runSteps: Array.from(job.runSteps.values()),
|
||||
aggregatedContent: job.aggregatedContent,
|
||||
runSteps,
|
||||
aggregatedContent,
|
||||
userMessage: job.metadata.userMessage,
|
||||
responseMessageId: job.metadata.responseMessageId,
|
||||
conversationId: job.metadata.conversationId,
|
||||
|
|
@ -423,41 +423,13 @@ class GenerationJobManagerClass {
|
|||
return this.jobs.get(streamId)?.syncSent ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate content parts from message delta events.
|
||||
* Used to save partial response when subscribers disconnect.
|
||||
* Uses flat format: { type: 'text', text: 'content' }
|
||||
*/
|
||||
private aggregateContent(job: GenerationJob, event: ServerSentEvent): void {
|
||||
// Check for on_message_delta events which contain content
|
||||
const data = event as Record<string, unknown>;
|
||||
if (data.event === 'on_message_delta' && data.data) {
|
||||
const eventData = data.data as Record<string, unknown>;
|
||||
const delta = eventData.delta as Record<string, unknown> | undefined;
|
||||
if (delta?.content && Array.isArray(delta.content)) {
|
||||
for (const part of delta.content) {
|
||||
if (part.type === 'text' && part.text) {
|
||||
// Find or create text content part in flat format
|
||||
let textPart = job.aggregatedContent?.find((p) => p.type === 'text');
|
||||
if (!textPart) {
|
||||
textPart = { type: 'text', text: '' };
|
||||
job.aggregatedContent = job.aggregatedContent || [];
|
||||
job.aggregatedContent.push(textPart);
|
||||
}
|
||||
textPart.text = (textPart.text || '') + part.text;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a done event to all subscribers.
|
||||
* Stores the final event for replay on reconnect.
|
||||
* @param streamId - The stream identifier
|
||||
* @param event - The final event data
|
||||
*/
|
||||
emitDone(streamId: string, event: ServerSentEvent): void {
|
||||
emitDone(streamId: string, event: t.ServerSentEvent): void {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return;
|
||||
|
|
@ -508,7 +480,6 @@ class GenerationJobManagerClass {
|
|||
const job = this.jobs.get(streamId);
|
||||
if (job) {
|
||||
job.emitter.removeAllListeners();
|
||||
job.runSteps.clear();
|
||||
this.jobs.delete(streamId);
|
||||
}
|
||||
}
|
||||
|
|
@ -539,10 +510,10 @@ class GenerationJobManagerClass {
|
|||
*/
|
||||
getStreamInfo(streamId: string): {
|
||||
active: boolean;
|
||||
status: GenerationJobStatus;
|
||||
status: t.GenerationJobStatus;
|
||||
chunkCount: number;
|
||||
runStepCount: number;
|
||||
aggregatedContent?: ContentPart[];
|
||||
aggregatedContent?: Agents.MessageContentComplex[];
|
||||
createdAt: number;
|
||||
} | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
|
|
@ -554,8 +525,8 @@ class GenerationJobManagerClass {
|
|||
active: job.status === 'running',
|
||||
status: job.status,
|
||||
chunkCount: job.chunks.length,
|
||||
runStepCount: job.runSteps.size,
|
||||
aggregatedContent: job.aggregatedContent,
|
||||
runStepCount: job.graphRef?.contentData?.length ?? 0,
|
||||
aggregatedContent: job.contentPartsRef ?? [],
|
||||
createdAt: job.createdAt,
|
||||
};
|
||||
}
|
||||
|
|
@ -570,8 +541,8 @@ class GenerationJobManagerClass {
|
|||
/**
|
||||
* Get count of jobs by status.
|
||||
*/
|
||||
getJobCountByStatus(): Record<GenerationJobStatus, number> {
|
||||
const counts: Record<GenerationJobStatus, number> = {
|
||||
getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
|
||||
const counts: Record<t.GenerationJobStatus, number> = {
|
||||
running: 0,
|
||||
complete: 0,
|
||||
error: 0,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue