diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 16ae4be601..8957b041ea 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -46,8 +46,10 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } = req.body; const userId = req.user.id; - const streamId = - reqConversationId || `stream_${Date.now()}_${Math.random().toString(36).slice(2)}`; + + // Generate conversationId upfront if not provided - streamId === conversationId always + const conversationId = reqConversationId || crypto.randomUUID(); + const streamId = conversationId; let client = null; @@ -59,7 +61,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } }); - const job = await GenerationJobManager.createJob(streamId, userId, reqConversationId); + const job = await GenerationJobManager.createJob(streamId, userId, conversationId); req._resumableStreamId = streamId; // Track if partial response was already saved to avoid duplicates @@ -86,7 +88,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } partialResponseSaved = true; - const responseConversationId = resumeState.conversationId || reqConversationId; + const responseConversationId = resumeState.conversationId || conversationId; try { const partialMessage = { @@ -145,18 +147,15 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit GenerationJobManager.setContentParts(streamId, client.contentParts); } - res.json({ streamId, status: 'started' }); + res.json({ streamId, conversationId, status: 'started' }); - let conversationId = reqConversationId; let userMessage; const getReqData = (data = {}) => { if (data.userMessage) { userMessage = data.userMessage; } - if (!conversationId && data.conversationId) { - conversationId = data.conversationId; - } + // conversationId is pre-generated, no need to update from callback }; // Start background generation - wait for subscriber with timeout fallback @@ -356,11 +355,11 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { return ResumableAgentController(req, res, next, initializeClient, addTitle); } - let { + const { text, isRegenerate, endpointOption, - conversationId, + conversationId: reqConversationId, isContinued = false, editedContent = null, parentMessageId = null, @@ -368,14 +367,17 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { responseMessageId: editedResponseMessageId = null, } = req.body; + // Generate conversationId upfront if not provided - streamId === conversationId always + const conversationId = reqConversationId || crypto.randomUUID(); + const streamId = conversationId; + let userMessage; let userMessageId; let responseMessageId; let client = null; let cleanupHandlers = []; - let streamId = null; - const newConvo = !conversationId; + const newConvo = !reqConversationId; const userId = req.user.id; // Create handler to avoid capturing the entire parent scope @@ -386,14 +388,13 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { userMessageId = data[key].messageId; } else if (key === 'responseMessageId') { responseMessageId = data[key]; - } else if (key === 'promptTokens' && streamId) { + } else if (key === 'promptTokens') { // Update job metadata with prompt tokens for abort handling GenerationJobManager.updateMetadata(streamId, { promptTokens: data[key] }); - } else if (key === 'sender' && streamId) { + } else if (key === 'sender') { GenerationJobManager.updateMetadata(streamId, { sender: data[key] }); - } else if (!conversationId && key === 'conversationId') { - conversationId = data[key]; } + // conversationId is pre-generated, no need to update from callback } }; @@ -427,10 +428,6 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { client = null; getReqData = null; userMessage = null; - if (endpointOption) { - endpointOption.agent = null; - } - endpointOption = null; cleanupHandlers = null; // Clear request data map @@ -481,9 +478,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { requestDataMap.set(req, { client }); // Create job in GenerationJobManager for abort handling - // Use conversationId as streamId, or generate one for new conversations - streamId = - conversationId || `nonresumable_${Date.now()}_${Math.random().toString(36).slice(2)}`; + // streamId === conversationId (pre-generated above) const job = await GenerationJobManager.createJob(streamId, userId, conversationId); // Store endpoint metadata for abort handling @@ -518,19 +513,13 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { userMessageId = userMsg.messageId; responseMessageId = respMsgId; - // Update conversationId if it was a new conversation - if (!conversationId && userMsg.conversationId) { - conversationId = userMsg.conversationId; - } - - // Store metadata for abort handling + // Store metadata for abort handling (conversationId is pre-generated) GenerationJobManager.updateMetadata(streamId, { responseMessageId: respMsgId, - conversationId: userMsg.conversationId, userMessage: { messageId: userMsg.messageId, parentMessageId: userMsg.parentMessageId, - conversationId: userMsg.conversationId, + conversationId, text: userMsg.text, }, }); diff --git a/api/server/middleware/abortMiddleware.js b/api/server/middleware/abortMiddleware.js index 9832e279a5..b85f1439cc 100644 --- a/api/server/middleware/abortMiddleware.js +++ b/api/server/middleware/abortMiddleware.js @@ -17,6 +17,7 @@ const { abortRun } = require('./abortRun'); /** * Abort an active message generation. * Uses GenerationJobManager for all agent requests. + * Since streamId === conversationId, we can directly abort by conversationId. */ async function abortMessage(req, res) { const { abortKey, endpoint } = req.body; @@ -28,8 +29,8 @@ async function abortMessage(req, res) { const conversationId = abortKey?.split(':')?.[0] ?? req.user.id; const userId = req.user.id; - // Use GenerationJobManager to abort the job - const abortResult = await GenerationJobManager.abortByConversation(conversationId); + // Use GenerationJobManager to abort the job (streamId === conversationId) + const abortResult = await GenerationJobManager.abortJob(conversationId); if (!abortResult.success) { if (!res.headersSent) { diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 1d55f7231c..3b2d3d5f38 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -122,7 +122,8 @@ router.get('/chat/stream/:streamId', async (req, res) => { router.get('/chat/status/:conversationId', async (req, res) => { const { conversationId } = req.params; - const job = await GenerationJobManager.getJobByConversation(conversationId); + // streamId === conversationId, so we can use getJob directly + const job = await GenerationJobManager.getJob(conversationId); if (!job) { return res.json({ active: false }); @@ -132,12 +133,12 @@ router.get('/chat/status/:conversationId', async (req, res) => { return res.status(403).json({ error: 'Unauthorized' }); } - const info = await GenerationJobManager.getStreamInfo(job.streamId); - const resumeState = await GenerationJobManager.getResumeState(job.streamId); + const info = await GenerationJobManager.getStreamInfo(conversationId); + const resumeState = await GenerationJobManager.getResumeState(conversationId); res.json({ active: info?.active ?? false, - streamId: job.streamId, + streamId: conversationId, status: info?.status ?? job.status, aggregatedContent: info?.aggregatedContent, createdAt: info?.createdAt ?? job.createdAt, @@ -158,21 +159,9 @@ router.post('/chat/abort', async (req, res) => { const { streamId, conversationId, abortKey } = req.body; - // Try to find job by streamId first, then by conversationId, then by abortKey - let jobStreamId = streamId; - let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; - - if (!job && conversationId) { - job = await GenerationJobManager.getJobByConversation(conversationId); - if (job) { - jobStreamId = job.streamId; - } - } - - if (!job && abortKey) { - jobStreamId = abortKey.split(':')[0]; - job = await GenerationJobManager.getJob(jobStreamId); - } + // streamId === conversationId, so try any of the provided IDs + const jobStreamId = streamId || conversationId || abortKey?.split(':')[0]; + const job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`); diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index ad861be1bc..a154435928 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -260,21 +260,6 @@ class GenerationJobManagerClass { return this.buildJobFacade(streamId, jobData, runtime); } - /** - * Find an active job by conversationId. - */ - async getJobByConversation(conversationId: string): Promise { - const jobData = await this.jobStore.getJobByConversation(conversationId); - if (!jobData) { - return undefined; - } - const runtime = this.runtimeState.get(jobData.streamId); - if (!runtime) { - return undefined; - } - return this.buildJobFacade(jobData.streamId, jobData, runtime); - } - /** * Check if a job exists. */ @@ -395,21 +380,6 @@ class GenerationJobManagerClass { .trim(); } - /** - * Abort a job by conversationId (for abort middleware). - * Returns abort result with all data needed for token spending and message saving. - */ - async abortByConversation(conversationId: string): Promise { - const jobData = await this.jobStore.getJobByConversation(conversationId); - if (!jobData) { - logger.debug( - `[GenerationJobManager] No active job found for conversation: ${conversationId}`, - ); - return { success: false, jobData: null, content: [], text: '', finalEvent: null }; - } - return this.abortJob(jobData.streamId); - } - /** * Subscribe to a job's event stream. * diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 8b8f697ebc..10d9e18df2 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -69,23 +69,6 @@ export class InMemoryJobStore implements IJobStore { return this.jobs.get(streamId) ?? null; } - async getJobByConversation(conversationId: string): Promise { - // Direct match first (streamId === conversationId for existing conversations) - const directMatch = this.jobs.get(conversationId); - if (directMatch && directMatch.status === 'running') { - return directMatch; - } - - // Search by conversationId in metadata - for (const job of this.jobs.values()) { - if (job.conversationId === conversationId && job.status === 'running') { - return job; - } - } - - return null; - } - async updateJob(streamId: string, updates: Partial): Promise { const job = this.jobs.get(streamId); if (!job) { diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 1360c974ee..d66db06039 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -89,12 +89,9 @@ export interface IJobStore { conversationId?: string, ): Promise; - /** Get a job by streamId */ + /** Get a job by streamId (streamId === conversationId) */ getJob(streamId: string): Promise; - /** Find active job by conversationId */ - getJobByConversation(conversationId: string): Promise; - /** Update job data */ updateJob(streamId: string, updates: Partial): Promise;