refactor: Streamline abort handling and integrate GenerationJobManager for improved job management

- Removed the abortControllers middleware and integrated abort handling directly into GenerationJobManager.
- Updated abortMessage function to utilize GenerationJobManager for aborting jobs by conversation ID, enhancing clarity and efficiency.
- Simplified cleanup processes and improved error handling during abort operations.
- Enhanced metadata management for jobs, including endpoint and model information, to facilitate better tracking and resource management.
This commit is contained in:
Danny Avila 2025-12-13 17:04:42 -05:00
parent 89f3cb5ad3
commit 9a1e329f57
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
7 changed files with 236 additions and 314 deletions

View file

@ -5,6 +5,7 @@ import type {
IContentStateManager,
SerializableJobData,
IEventTransport,
AbortResult,
IJobStore,
} from './interfaces/IJobStore';
import type * as t from '~/types';
@ -307,14 +308,15 @@ class GenerationJobManagerClass {
/**
* Abort a job (user-initiated).
* Returns all data needed for token spending and message saving.
*/
async abortJob(streamId: string): Promise<void> {
async abortJob(streamId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJob(streamId);
const runtime = this.runtimeState.get(streamId);
if (!jobData) {
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
return;
return { success: false, jobData: null, content: [], text: '', finalEvent: null };
}
if (runtime) {
@ -326,9 +328,12 @@ class GenerationJobManagerClass {
completedAt: Date.now(),
});
// Get content and extract text
const content = this.contentState.getContentParts(streamId) ?? [];
const text = this.extractTextFromContent(content);
// Create final event for abort
const userMessageId = jobData.userMessage?.messageId;
const content = this.contentState.getContentParts(streamId) ?? [];
const abortFinalEvent: t.ServerSentEvent = {
final: true,
@ -348,6 +353,7 @@ class GenerationJobManagerClass {
parentMessageId: userMessageId,
conversationId: jobData.conversationId,
content,
text,
sender: jobData.sender ?? 'AI',
unfinished: true,
error: false,
@ -364,6 +370,44 @@ class GenerationJobManagerClass {
this.contentState.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
return {
success: true,
jobData,
content,
text,
finalEvent: abortFinalEvent,
};
}
/**
* Extract plain text from content parts array.
*/
private extractTextFromContent(content: Agents.MessageContentComplex[]): string {
return content
.map((part) => {
if ('text' in part && typeof part.text === 'string') {
return part.text;
}
return '';
})
.join('')
.trim();
}
/**
* Abort a job by conversationId (for abort middleware).
* Returns abort result with all data needed for token spending and message saving.
*/
async abortByConversation(conversationId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJobByConversation(conversationId);
if (!jobData) {
logger.debug(
`[GenerationJobManager] No active job found for conversation: ${conversationId}`,
);
return { success: false, jobData: null, content: [], text: '', finalEvent: null };
}
return this.abortJob(jobData.streamId);
}
/**
@ -494,6 +538,18 @@ class GenerationJobManagerClass {
if (metadata.userMessage) {
updates.userMessage = metadata.userMessage;
}
if (metadata.endpoint) {
updates.endpoint = metadata.endpoint;
}
if (metadata.iconURL) {
updates.iconURL = metadata.iconURL;
}
if (metadata.model) {
updates.model = metadata.model;
}
if (metadata.promptTokens !== undefined) {
updates.promptTokens = metadata.promptTokens;
}
this.jobStore.updateJob(streamId, updates);
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
}