WIP: resuming

This commit is contained in:
Danny Avila 2025-12-04 08:57:13 -05:00
parent 8104083581
commit db9b050b86
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
8 changed files with 478 additions and 85 deletions

View file

@ -8,6 +8,7 @@ import type {
DoneHandler,
ErrorHandler,
UnsubscribeFn,
ContentPart,
} from './types';
/**
@ -68,6 +69,8 @@ class GenerationJobManagerClass {
metadata: { userId, conversationId },
readyPromise,
resolveReady: resolveReady!,
chunks: [],
aggregatedContent: [],
};
job.emitter.setMaxListeners(100);
@ -87,6 +90,28 @@ class GenerationJobManagerClass {
return this.jobs.get(streamId);
}
/**
* Find an active job by conversationId.
* Since streamId === conversationId for existing conversations,
* we first check by streamId, then search metadata.
* @param conversationId - The conversation identifier
* @returns The job if found, undefined otherwise
*/
getJobByConversation(conversationId: string): GenerationJob | undefined {
const directMatch = this.jobs.get(conversationId);
if (directMatch && directMatch.status === 'running') {
return directMatch;
}
for (const job of this.jobs.values()) {
if (job.metadata.conversationId === conversationId && job.status === 'running') {
return job;
}
}
return undefined;
}
/**
* Check if a job exists.
* @param streamId - The stream identifier
@ -144,24 +169,51 @@ class GenerationJobManagerClass {
}
/**
* Subscribe to a job's event stream.
* Subscribe to a job's event stream with replay support.
* Replays any chunks buffered during disconnect, then continues with live events.
* Buffer is cleared after replay (only holds chunks missed during disconnect).
* @param streamId - The stream identifier
* @param onChunk - Handler for chunk events
* @param onDone - Optional handler for completion
* @param onError - Optional handler for errors
* @returns Unsubscribe function, or null if job not found
* @returns Object with unsubscribe function, or null if job not found
*/
subscribe(
streamId: string,
onChunk: ChunkHandler,
onDone?: DoneHandler,
onError?: ErrorHandler,
): UnsubscribeFn | null {
): { unsubscribe: UnsubscribeFn } | null {
const job = this.jobs.get(streamId);
if (!job) {
return null;
}
// Replay buffered chunks (only chunks missed during disconnect)
const chunksToReplay = [...job.chunks];
const replayCount = chunksToReplay.length;
if (replayCount > 0) {
logger.debug(
`[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`,
);
}
// Clear buffer after capturing for replay - subscriber is now connected
job.chunks = [];
// Use setImmediate to allow the caller to set up their connection first
setImmediate(() => {
for (const chunk of chunksToReplay) {
onChunk(chunk);
}
// If job is already complete, send the final event
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
onDone?.(job.finalEvent);
}
});
const chunkHandler = (event: ServerSentEvent) => onChunk(event);
const doneHandler = (event: ServerSentEvent) => onDone?.(event);
const errorHandler = (error: string) => onError?.(error);
@ -176,18 +228,27 @@ class GenerationJobManagerClass {
logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`);
}
return () => {
const unsubscribe = () => {
const currentJob = this.jobs.get(streamId);
if (currentJob) {
currentJob.emitter.off('chunk', chunkHandler);
currentJob.emitter.off('done', doneHandler);
currentJob.emitter.off('error', errorHandler);
// Emit event when last subscriber leaves (for saving partial response)
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent);
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`);
}
}
};
return { unsubscribe };
}
/**
* Emit a chunk event to all subscribers.
* Only buffers chunks when no subscribers are listening (for reconnect replay).
* @param streamId - The stream identifier
* @param event - The event data to emit
*/
@ -196,11 +257,49 @@ class GenerationJobManagerClass {
if (!job || job.status !== 'running') {
return;
}
// Only buffer if no one is listening (for reconnect replay)
const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
if (!hasSubscribers) {
job.chunks.push(event);
}
// Always aggregate content (for partial response saving)
this.aggregateContent(job, event);
job.emitter.emit('chunk', event);
}
/**
* Aggregate content parts from message delta events.
* Used to save partial response when subscribers disconnect.
*/
private aggregateContent(job: GenerationJob, event: ServerSentEvent): void {
// Check for on_message_delta events which contain content
const data = event as Record<string, unknown>;
if (data.event === 'on_message_delta' && data.data) {
const eventData = data.data as Record<string, unknown>;
const delta = eventData.delta as Record<string, unknown> | undefined;
if (delta?.content && Array.isArray(delta.content)) {
for (const part of delta.content) {
if (part.type === 'text' && part.text) {
// Find or create text content part
let textPart = job.aggregatedContent?.find((p) => p.type === 'text');
if (!textPart) {
textPart = { type: 'text', text: '' };
job.aggregatedContent = job.aggregatedContent || [];
job.aggregatedContent.push(textPart);
}
textPart.text = (textPart.text || '') + part.text;
}
}
}
}
}
/**
* Emit a done event to all subscribers.
* Stores the final event for replay on reconnect.
* @param streamId - The stream identifier
* @param event - The final event data
*/
@ -209,6 +308,7 @@ class GenerationJobManagerClass {
if (!job) {
return;
}
job.finalEvent = event;
job.emitter.emit('done', event);
}
@ -278,6 +378,31 @@ class GenerationJobManagerClass {
}
}
/**
* Get stream info for status endpoint.
* Returns chunk count, status, and aggregated content.
*/
getStreamInfo(streamId: string): {
active: boolean;
status: GenerationJobStatus;
chunkCount: number;
aggregatedContent?: ContentPart[];
createdAt: number;
} | null {
const job = this.jobs.get(streamId);
if (!job) {
return null;
}
return {
active: job.status === 'running',
status: job.status,
chunkCount: job.chunks.length,
aggregatedContent: job.aggregatedContent,
createdAt: job.createdAt,
};
}
/**
* Get total number of active jobs.
*/