refactor: Unify streamId and conversationId handling for improved job management

- Updated ResumableAgentController and AgentController to generate conversationId upfront, ensuring it matches streamId for consistency.
- Simplified job creation and metadata management by removing redundant conversationId updates from callbacks.
- Refactored abortMiddleware and related methods to utilize the unified streamId/conversationId approach, enhancing clarity in job handling.
- Removed deprecated methods from GenerationJobManager and InMemoryJobStore, streamlining the codebase and improving maintainability.
This commit is contained in:
Danny Avila 2025-12-13 17:36:33 -05:00
parent abe5b6cfc7
commit a04b751f69
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
6 changed files with 33 additions and 104 deletions

View file

@ -260,21 +260,6 @@ class GenerationJobManagerClass {
return this.buildJobFacade(streamId, jobData, runtime);
}
/**
* Find an active job by conversationId.
*/
async getJobByConversation(conversationId: string): Promise<t.GenerationJob | undefined> {
const jobData = await this.jobStore.getJobByConversation(conversationId);
if (!jobData) {
return undefined;
}
const runtime = this.runtimeState.get(jobData.streamId);
if (!runtime) {
return undefined;
}
return this.buildJobFacade(jobData.streamId, jobData, runtime);
}
/**
* Check if a job exists.
*/
@ -395,21 +380,6 @@ class GenerationJobManagerClass {
.trim();
}
/**
* Abort a job by conversationId (for abort middleware).
* Returns abort result with all data needed for token spending and message saving.
*/
async abortByConversation(conversationId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJobByConversation(conversationId);
if (!jobData) {
logger.debug(
`[GenerationJobManager] No active job found for conversation: ${conversationId}`,
);
return { success: false, jobData: null, content: [], text: '', finalEvent: null };
}
return this.abortJob(jobData.streamId);
}
/**
* Subscribe to a job's event stream.
*

View file

@ -69,23 +69,6 @@ export class InMemoryJobStore implements IJobStore {
return this.jobs.get(streamId) ?? null;
}
async getJobByConversation(conversationId: string): Promise<SerializableJobData | null> {
// Direct match first (streamId === conversationId for existing conversations)
const directMatch = this.jobs.get(conversationId);
if (directMatch && directMatch.status === 'running') {
return directMatch;
}
// Search by conversationId in metadata
for (const job of this.jobs.values()) {
if (job.conversationId === conversationId && job.status === 'running') {
return job;
}
}
return null;
}
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
const job = this.jobs.get(streamId);
if (!job) {

View file

@ -89,12 +89,9 @@ export interface IJobStore {
conversationId?: string,
): Promise<SerializableJobData>;
/** Get a job by streamId */
/** Get a job by streamId (streamId === conversationId) */
getJob(streamId: string): Promise<SerializableJobData | null>;
/** Find active job by conversationId */
getJobByConversation(conversationId: string): Promise<SerializableJobData | null>;
/** Update job data */
updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void>;