diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 7f562d0d6d..16ae4be601 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -6,11 +6,7 @@ const { sanitizeFileForTransmit, sanitizeMessageForTransmit, } = require('@librechat/api'); -const { - handleAbortError, - createAbortController, - cleanupAbortController, -} = require('~/server/middleware'); +const { handleAbortError } = require('~/server/middleware'); const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup'); const { saveMessage } = require('~/models'); @@ -350,6 +346,10 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } }; +/** + * Non-resumable Agent Controller - Uses GenerationJobManager for abort handling. + * Response is streamed directly to client via res, but abort state is managed centrally. + */ const AgentController = async (req, res, next, initializeClient, addTitle) => { const isResumable = req.query.resumable === 'true'; if (isResumable) { @@ -368,16 +368,12 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { responseMessageId: editedResponseMessageId = null, } = req.body; - let sender; - let abortKey; let userMessage; - let promptTokens; let userMessageId; let responseMessageId; - let userMessagePromise; - let getAbortData; let client = null; let cleanupHandlers = []; + let streamId = null; const newConvo = !conversationId; const userId = req.user.id; @@ -388,16 +384,13 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { if (key === 'userMessage') { userMessage = data[key]; userMessageId = data[key].messageId; - } else if (key === 'userMessagePromise') { - userMessagePromise = data[key]; } else if (key === 'responseMessageId') { responseMessageId = data[key]; - } else if (key === 'promptTokens') { - promptTokens = data[key]; - } else if (key === 'sender') { - sender = data[key]; - } else if (key === 'abortKey') { - abortKey = data[key]; + } else if (key === 'promptTokens' && streamId) { + // Update job metadata with prompt tokens for abort handling + GenerationJobManager.updateMetadata(streamId, { promptTokens: data[key] }); + } else if (key === 'sender' && streamId) { + GenerationJobManager.updateMetadata(streamId, { sender: data[key] }); } else if (!conversationId && key === 'conversationId') { conversationId = data[key]; } @@ -405,7 +398,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { }; // Create a function to handle final cleanup - const performCleanup = () => { + const performCleanup = async () => { logger.debug('[AgentController] Performing cleanup'); if (Array.isArray(cleanupHandlers)) { for (const handler of cleanupHandlers) { @@ -419,10 +412,10 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { } } - // Clean up abort controller - if (abortKey) { - logger.debug('[AgentController] Cleaning up abort controller'); - cleanupAbortController(abortKey); + // Complete the job in GenerationJobManager + if (streamId) { + logger.debug('[AgentController] Completing job in GenerationJobManager'); + await GenerationJobManager.completeJob(streamId); } // Dispose client properly @@ -434,11 +427,11 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { client = null; getReqData = null; userMessage = null; - getAbortData = null; - endpointOption.agent = null; + if (endpointOption) { + endpointOption.agent = null; + } endpointOption = null; cleanupHandlers = null; - userMessagePromise = null; // Clear request data map if (requestDataMap.has(req)) { @@ -460,6 +453,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { } }; cleanupHandlers.push(removePrelimHandler); + /** @type {{ client: TAgentClient; userMCPAuthMap?: Record> }} */ const result = await initializeClient({ req, @@ -467,6 +461,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { endpointOption, signal: prelimAbortController.signal, }); + if (prelimAbortController.signal?.aborted) { prelimAbortController = null; throw new Error('Request was aborted before initialization could complete'); @@ -485,28 +480,26 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { // Store request data in WeakMap keyed by req object requestDataMap.set(req, { client }); - // Use WeakRef to allow GC but still access content if it exists - const contentRef = new WeakRef(client.contentParts || []); + // Create job in GenerationJobManager for abort handling + // Use conversationId as streamId, or generate one for new conversations + streamId = + conversationId || `nonresumable_${Date.now()}_${Math.random().toString(36).slice(2)}`; + const job = await GenerationJobManager.createJob(streamId, userId, conversationId); - // Minimize closure scope - only capture small primitives and WeakRef - getAbortData = () => { - // Dereference WeakRef each time - const content = contentRef.deref(); + // Store endpoint metadata for abort handling + GenerationJobManager.updateMetadata(streamId, { + endpoint: endpointOption.endpoint, + iconURL: endpointOption.iconURL, + model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, + sender: client?.sender, + }); - return { - sender, - content: content || [], - userMessage, - promptTokens, - conversationId, - userMessagePromise, - messageId: responseMessageId, - parentMessageId: overrideParentMessageId ?? userMessageId, - }; - }; + // Store content parts reference for abort + if (client?.contentParts) { + GenerationJobManager.setContentParts(streamId, client.contentParts); + } - const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData); - const closeHandler = createCloseHandler(abortController); + const closeHandler = createCloseHandler(job.abortController); res.on('close', closeHandler); cleanupHandlers.push(() => { try { @@ -516,6 +509,33 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { } }); + /** + * onStart callback - stores user message and response ID for abort handling + */ + const onStart = (userMsg, respMsgId, _isNewConvo) => { + sendEvent(res, { message: userMsg, created: true }); + userMessage = userMsg; + userMessageId = userMsg.messageId; + responseMessageId = respMsgId; + + // Update conversationId if it was a new conversation + if (!conversationId && userMsg.conversationId) { + conversationId = userMsg.conversationId; + } + + // Store metadata for abort handling + GenerationJobManager.updateMetadata(streamId, { + responseMessageId: respMsgId, + conversationId: userMsg.conversationId, + userMessage: { + messageId: userMsg.messageId, + parentMessageId: userMsg.parentMessageId, + conversationId: userMsg.conversationId, + text: userMsg.text, + }, + }); + }; + const messageOptions = { user: userId, onStart, @@ -525,7 +545,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { editedContent, conversationId, parentMessageId, - abortController, + abortController: job.abortController, overrideParentMessageId, isEdited: !!editedContent, userMCPAuthMap: result.userMCPAuthMap, @@ -565,7 +585,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { } // Only send if not aborted - if (!abortController.signal.aborted) { + if (!job.abortController.signal.aborted) { // Create a new response object with minimal copies const finalResponse = { ...response }; @@ -639,7 +659,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => { // Handle error without capturing much scope handleAbortError(res, req, error, { conversationId, - sender, + sender: client?.sender, messageId: responseMessageId, parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId, userMessageId, diff --git a/api/server/middleware/abortControllers.js b/api/server/middleware/abortControllers.js deleted file mode 100644 index 31acbfe389..0000000000 --- a/api/server/middleware/abortControllers.js +++ /dev/null @@ -1,2 +0,0 @@ -// abortControllers.js -module.exports = new Map(); diff --git a/api/server/middleware/abortMiddleware.js b/api/server/middleware/abortMiddleware.js index 1f762ca808..9832e279a5 100644 --- a/api/server/middleware/abortMiddleware.js +++ b/api/server/middleware/abortMiddleware.js @@ -1,124 +1,101 @@ const { logger } = require('@librechat/data-schemas'); -const { countTokens, isEnabled, sendEvent, sanitizeMessageForTransmit } = require('@librechat/api'); -const { isAssistantsEndpoint, ErrorTypes, Constants } = require('librechat-data-provider'); +const { + countTokens, + isEnabled, + sendEvent, + GenerationJobManager, + sanitizeMessageForTransmit, +} = require('@librechat/api'); +const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider'); const { truncateText, smartTruncateText } = require('~/app/clients/prompts'); const clearPendingReq = require('~/cache/clearPendingReq'); const { sendError } = require('~/server/middleware/error'); const { spendTokens } = require('~/models/spendTokens'); -const abortControllers = require('./abortControllers'); const { saveMessage, getConvo } = require('~/models'); const { abortRun } = require('./abortRun'); -const abortDataMap = new WeakMap(); - /** - * @param {string} abortKey - * @returns {boolean} + * Abort an active message generation. + * Uses GenerationJobManager for all agent requests. */ -function cleanupAbortController(abortKey) { - if (!abortControllers.has(abortKey)) { - return false; - } - - const { abortController } = abortControllers.get(abortKey); - - if (!abortController) { - abortControllers.delete(abortKey); - return true; - } - - // 1. Check if this controller has any composed signals and clean them up - try { - // This creates a temporary composed signal to use for cleanup - const composedSignal = AbortSignal.any([abortController.signal]); - - // Get all event types - in practice, AbortSignal typically only uses 'abort' - const eventTypes = ['abort']; - - // First, execute a dummy listener removal to handle potential composed signals - for (const eventType of eventTypes) { - const dummyHandler = () => {}; - composedSignal.addEventListener(eventType, dummyHandler); - composedSignal.removeEventListener(eventType, dummyHandler); - - const listeners = composedSignal.listeners?.(eventType) || []; - for (const listener of listeners) { - composedSignal.removeEventListener(eventType, listener); - } - } - } catch (e) { - logger.debug(`Error cleaning up composed signals: ${e}`); - } - - // 2. Abort the controller if not already aborted - if (!abortController.signal.aborted) { - abortController.abort(); - } - - // 3. Remove from registry - abortControllers.delete(abortKey); - - // 4. Clean up any data stored in the WeakMap - if (abortDataMap.has(abortController)) { - abortDataMap.delete(abortController); - } - - // 5. Clean up function references on the controller - if (abortController.getAbortData) { - abortController.getAbortData = null; - } - - if (abortController.abortCompletion) { - abortController.abortCompletion = null; - } - - return true; -} - -/** - * @param {string} abortKey - * @returns {function(): void} - */ -function createCleanUpHandler(abortKey) { - return function () { - try { - cleanupAbortController(abortKey); - } catch { - // Ignore cleanup errors - } - }; -} - async function abortMessage(req, res) { - let { abortKey, endpoint } = req.body; + const { abortKey, endpoint } = req.body; if (isAssistantsEndpoint(endpoint)) { return await abortRun(req, res); } const conversationId = abortKey?.split(':')?.[0] ?? req.user.id; + const userId = req.user.id; - if (!abortControllers.has(abortKey) && abortControllers.has(conversationId)) { - abortKey = conversationId; + // Use GenerationJobManager to abort the job + const abortResult = await GenerationJobManager.abortByConversation(conversationId); + + if (!abortResult.success) { + if (!res.headersSent) { + return res.status(204).send({ message: 'Request not found' }); + } + return; } - if (!abortControllers.has(abortKey) && !res.headersSent) { - return res.status(204).send({ message: 'Request not found' }); - } + const { jobData, content, text } = abortResult; - const { abortController } = abortControllers.get(abortKey) ?? {}; - if (!abortController) { - return res.status(204).send({ message: 'Request not found' }); - } + // Count tokens and spend them + const completionTokens = await countTokens(text); + const promptTokens = jobData?.promptTokens ?? 0; - const finalEvent = await abortController.abortCompletion?.(); - logger.debug( - `[abortMessage] ID: ${req.user.id} | ${req.user.email} | Aborted request: ` + - JSON.stringify({ abortKey }), + const responseMessage = { + messageId: jobData?.responseMessageId, + parentMessageId: jobData?.userMessage?.messageId, + conversationId: jobData?.conversationId, + content, + text, + sender: jobData?.sender ?? 'AI', + finish_reason: 'incomplete', + endpoint: jobData?.endpoint, + iconURL: jobData?.iconURL, + model: jobData?.model, + unfinished: false, + error: false, + isCreatedByUser: false, + tokenCount: completionTokens, + }; + + await spendTokens( + { ...responseMessage, context: 'incomplete', user: userId }, + { promptTokens, completionTokens }, ); - cleanupAbortController(abortKey); - if (res.headersSent && finalEvent) { + await saveMessage( + req, + { ...responseMessage, user: userId }, + { context: 'api/server/middleware/abortMiddleware.js' }, + ); + + // Get conversation for title + const conversation = await getConvo(userId, conversationId); + + const finalEvent = { + title: conversation && !conversation.title ? null : conversation?.title || 'New Chat', + final: true, + conversation, + requestMessage: jobData?.userMessage + ? sanitizeMessageForTransmit({ + messageId: jobData.userMessage.messageId, + parentMessageId: jobData.userMessage.parentMessageId, + conversationId: jobData.userMessage.conversationId, + text: jobData.userMessage.text, + isCreatedByUser: true, + }) + : null, + responseMessage, + }; + + logger.debug( + `[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`, + ); + + if (res.headersSent) { return sendEvent(res, finalEvent); } @@ -139,171 +116,13 @@ const handleAbort = function () { }; }; -const createAbortController = (req, res, getAbortData, getReqData) => { - const abortController = new AbortController(); - const { endpointOption } = req.body; - - // Store minimal data in WeakMap to avoid circular references - abortDataMap.set(abortController, { - getAbortDataFn: getAbortData, - userId: req.user.id, - endpoint: endpointOption.endpoint, - iconURL: endpointOption.iconURL, - model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, - }); - - // Replace the direct function reference with a wrapper that uses WeakMap - abortController.getAbortData = function () { - const data = abortDataMap.get(this); - if (!data || typeof data.getAbortDataFn !== 'function') { - return {}; - } - - try { - const result = data.getAbortDataFn(); - - // Create a copy without circular references - const cleanResult = { ...result }; - - // If userMessagePromise exists, break its reference to client - if ( - cleanResult.userMessagePromise && - typeof cleanResult.userMessagePromise.then === 'function' - ) { - // Create a new promise that fulfills with the same result but doesn't reference the original - const originalPromise = cleanResult.userMessagePromise; - cleanResult.userMessagePromise = new Promise((resolve, reject) => { - originalPromise.then( - (result) => resolve({ ...result }), - (error) => reject(error), - ); - }); - } - - return cleanResult; - } catch (err) { - logger.error('[abortController.getAbortData] Error:', err); - return {}; - } - }; - - /** - * @param {TMessage} userMessage - * @param {string} responseMessageId - * @param {boolean} [isNewConvo] - */ - const onStart = (userMessage, responseMessageId, isNewConvo) => { - sendEvent(res, { message: userMessage, created: true }); - - const prelimAbortKey = userMessage?.conversationId ?? req.user.id; - const abortKey = isNewConvo - ? `${prelimAbortKey}${Constants.COMMON_DIVIDER}${Constants.NEW_CONVO}` - : prelimAbortKey; - getReqData({ abortKey }); - const prevRequest = abortControllers.get(abortKey); - const { overrideUserMessageId } = req?.body ?? {}; - - if (overrideUserMessageId != null && prevRequest && prevRequest?.abortController) { - const data = prevRequest.abortController.getAbortData(); - getReqData({ userMessage: data?.userMessage }); - const addedAbortKey = `${abortKey}:${responseMessageId}`; - - // Store minimal options - const minimalOptions = { - endpoint: endpointOption.endpoint, - iconURL: endpointOption.iconURL, - model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, - }; - - abortControllers.set(addedAbortKey, { abortController, ...minimalOptions }); - const cleanupHandler = createCleanUpHandler(addedAbortKey); - res.on('finish', cleanupHandler); - return; - } - - // Store minimal options - const minimalOptions = { - endpoint: endpointOption.endpoint, - iconURL: endpointOption.iconURL, - model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, - }; - - abortControllers.set(abortKey, { abortController, ...minimalOptions }); - const cleanupHandler = createCleanUpHandler(abortKey); - res.on('finish', cleanupHandler); - }; - - // Define abortCompletion without capturing the entire parent scope - abortController.abortCompletion = async function () { - this.abort(); - - // Get data from WeakMap - const ctrlData = abortDataMap.get(this); - if (!ctrlData || !ctrlData.getAbortDataFn) { - return { final: true, conversation: {}, title: 'New Chat' }; - } - - // Get abort data using stored function - const { conversationId, userMessage, userMessagePromise, promptTokens, ...responseData } = - ctrlData.getAbortDataFn(); - - const completionTokens = await countTokens(responseData?.text ?? ''); - const user = ctrlData.userId; - - const responseMessage = { - ...responseData, - conversationId, - finish_reason: 'incomplete', - endpoint: ctrlData.endpoint, - iconURL: ctrlData.iconURL, - model: ctrlData.modelOptions?.model ?? ctrlData.model_parameters?.model, - unfinished: false, - error: false, - isCreatedByUser: false, - tokenCount: completionTokens, - }; - - await spendTokens( - { ...responseMessage, context: 'incomplete', user }, - { promptTokens, completionTokens }, - ); - - await saveMessage( - req, - { ...responseMessage, user }, - { context: 'api/server/middleware/abortMiddleware.js' }, - ); - - let conversation; - if (userMessagePromise) { - const resolved = await userMessagePromise; - conversation = resolved?.conversation; - // Break reference to promise - resolved.conversation = null; - } - - if (!conversation) { - conversation = await getConvo(user, conversationId); - } - - return { - title: conversation && !conversation.title ? null : conversation?.title || 'New Chat', - final: true, - conversation, - requestMessage: sanitizeMessageForTransmit(userMessage), - responseMessage: responseMessage, - }; - }; - - return { abortController, onStart }; -}; - /** + * Handle abort errors during generation. * @param {ServerResponse} res * @param {ServerRequest} req * @param {Error | unknown} error * @param {Partial & { partialText?: string }} data - * @returns { Promise } + * @returns {Promise} */ const handleAbortError = async (res, req, error, data) => { if (error?.message?.includes('base64')) { @@ -368,8 +187,7 @@ const handleAbortError = async (res, req, error, data) => { }; } - const callback = createCleanUpHandler(conversationId); - await sendError(req, res, options, callback); + await sendError(req, res, options); }; if (partialText && partialText.length > 5) { @@ -387,6 +205,4 @@ const handleAbortError = async (res, req, error, data) => { module.exports = { handleAbort, handleAbortError, - createAbortController, - cleanupAbortController, }; diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 8e4f539bee..ad861be1bc 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -5,6 +5,7 @@ import type { IContentStateManager, SerializableJobData, IEventTransport, + AbortResult, IJobStore, } from './interfaces/IJobStore'; import type * as t from '~/types'; @@ -307,14 +308,15 @@ class GenerationJobManagerClass { /** * Abort a job (user-initiated). + * Returns all data needed for token spending and message saving. */ - async abortJob(streamId: string): Promise { + async abortJob(streamId: string): Promise { const jobData = await this.jobStore.getJob(streamId); const runtime = this.runtimeState.get(streamId); if (!jobData) { logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`); - return; + return { success: false, jobData: null, content: [], text: '', finalEvent: null }; } if (runtime) { @@ -326,9 +328,12 @@ class GenerationJobManagerClass { completedAt: Date.now(), }); + // Get content and extract text + const content = this.contentState.getContentParts(streamId) ?? []; + const text = this.extractTextFromContent(content); + // Create final event for abort const userMessageId = jobData.userMessage?.messageId; - const content = this.contentState.getContentParts(streamId) ?? []; const abortFinalEvent: t.ServerSentEvent = { final: true, @@ -348,6 +353,7 @@ class GenerationJobManagerClass { parentMessageId: userMessageId, conversationId: jobData.conversationId, content, + text, sender: jobData.sender ?? 'AI', unfinished: true, error: false, @@ -364,6 +370,44 @@ class GenerationJobManagerClass { this.contentState.clearContentState(streamId); logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); + + return { + success: true, + jobData, + content, + text, + finalEvent: abortFinalEvent, + }; + } + + /** + * Extract plain text from content parts array. + */ + private extractTextFromContent(content: Agents.MessageContentComplex[]): string { + return content + .map((part) => { + if ('text' in part && typeof part.text === 'string') { + return part.text; + } + return ''; + }) + .join('') + .trim(); + } + + /** + * Abort a job by conversationId (for abort middleware). + * Returns abort result with all data needed for token spending and message saving. + */ + async abortByConversation(conversationId: string): Promise { + const jobData = await this.jobStore.getJobByConversation(conversationId); + if (!jobData) { + logger.debug( + `[GenerationJobManager] No active job found for conversation: ${conversationId}`, + ); + return { success: false, jobData: null, content: [], text: '', finalEvent: null }; + } + return this.abortJob(jobData.streamId); } /** @@ -494,6 +538,18 @@ class GenerationJobManagerClass { if (metadata.userMessage) { updates.userMessage = metadata.userMessage; } + if (metadata.endpoint) { + updates.endpoint = metadata.endpoint; + } + if (metadata.iconURL) { + updates.iconURL = metadata.iconURL; + } + if (metadata.model) { + updates.model = metadata.model; + } + if (metadata.promptTokens !== undefined) { + updates.promptTokens = metadata.promptTokens; + } this.jobStore.updateJob(streamId, updates); logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`); } diff --git a/packages/api/src/stream/index.ts b/packages/api/src/stream/index.ts index 42db007151..c7ab2a07db 100644 --- a/packages/api/src/stream/index.ts +++ b/packages/api/src/stream/index.ts @@ -1 +1,2 @@ export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager'; +export type { AbortResult, SerializableJobData, JobStatus } from './interfaces/IJobStore'; diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 7663f7c4b7..1360c974ee 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -37,6 +37,29 @@ export interface SerializableJobData { /** Serialized final event for replay */ finalEvent?: string; + + /** Endpoint metadata for abort handling - avoids storing functions */ + endpoint?: string; + iconURL?: string; + model?: string; + promptTokens?: number; +} + +/** + * Result returned from aborting a job - contains all data needed + * for token spending and message saving without storing callbacks + */ +export interface AbortResult { + /** Whether the abort was successful */ + success: boolean; + /** The job data at time of abort */ + jobData: SerializableJobData | null; + /** Aggregated content from the stream */ + content: Agents.MessageContentComplex[]; + /** Plain text representation of content */ + text: string; + /** Final event to send to client */ + finalEvent: unknown; } /** diff --git a/packages/api/src/types/stream.ts b/packages/api/src/types/stream.ts index d4df950210..79b29d774f 100644 --- a/packages/api/src/types/stream.ts +++ b/packages/api/src/types/stream.ts @@ -11,6 +11,14 @@ export interface GenerationJobMetadata { responseMessageId?: string; /** Sender label for the response (e.g., "GPT-4.1", "Claude") */ sender?: string; + /** Endpoint identifier for abort handling */ + endpoint?: string; + /** Icon URL for UI display */ + iconURL?: string; + /** Model name for token tracking */ + model?: string; + /** Prompt token count for abort token spending */ + promptTokens?: number; } export type GenerationJobStatus = 'running' | 'complete' | 'error' | 'aborted';