refactor: Streamline abort handling and integrate GenerationJobManager for improved job management

- Removed the abortControllers middleware and integrated abort handling directly into GenerationJobManager.
- Updated abortMessage function to utilize GenerationJobManager for aborting jobs by conversation ID, enhancing clarity and efficiency.
- Simplified cleanup processes and improved error handling during abort operations.
- Enhanced metadata management for jobs, including endpoint and model information, to facilitate better tracking and resource management.
This commit is contained in:
Danny Avila 2025-12-13 17:04:42 -05:00
parent fe1cc4a61d
commit 3a23badf5f
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
7 changed files with 236 additions and 314 deletions

View file

@ -6,11 +6,7 @@ const {
sanitizeFileForTransmit,
sanitizeMessageForTransmit,
} = require('@librechat/api');
const {
handleAbortError,
createAbortController,
cleanupAbortController,
} = require('~/server/middleware');
const { handleAbortError } = require('~/server/middleware');
const { disposeClient, clientRegistry, requestDataMap } = require('~/server/cleanup');
const { saveMessage } = require('~/models');
@ -350,6 +346,10 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
}
};
/**
* Non-resumable Agent Controller - Uses GenerationJobManager for abort handling.
* Response is streamed directly to client via res, but abort state is managed centrally.
*/
const AgentController = async (req, res, next, initializeClient, addTitle) => {
const isResumable = req.query.resumable === 'true';
if (isResumable) {
@ -368,16 +368,12 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
responseMessageId: editedResponseMessageId = null,
} = req.body;
let sender;
let abortKey;
let userMessage;
let promptTokens;
let userMessageId;
let responseMessageId;
let userMessagePromise;
let getAbortData;
let client = null;
let cleanupHandlers = [];
let streamId = null;
const newConvo = !conversationId;
const userId = req.user.id;
@ -388,16 +384,13 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
if (key === 'userMessage') {
userMessage = data[key];
userMessageId = data[key].messageId;
} else if (key === 'userMessagePromise') {
userMessagePromise = data[key];
} else if (key === 'responseMessageId') {
responseMessageId = data[key];
} else if (key === 'promptTokens') {
promptTokens = data[key];
} else if (key === 'sender') {
sender = data[key];
} else if (key === 'abortKey') {
abortKey = data[key];
} else if (key === 'promptTokens' && streamId) {
// Update job metadata with prompt tokens for abort handling
GenerationJobManager.updateMetadata(streamId, { promptTokens: data[key] });
} else if (key === 'sender' && streamId) {
GenerationJobManager.updateMetadata(streamId, { sender: data[key] });
} else if (!conversationId && key === 'conversationId') {
conversationId = data[key];
}
@ -405,7 +398,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
};
// Create a function to handle final cleanup
const performCleanup = () => {
const performCleanup = async () => {
logger.debug('[AgentController] Performing cleanup');
if (Array.isArray(cleanupHandlers)) {
for (const handler of cleanupHandlers) {
@ -419,10 +412,10 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
}
}
// Clean up abort controller
if (abortKey) {
logger.debug('[AgentController] Cleaning up abort controller');
cleanupAbortController(abortKey);
// Complete the job in GenerationJobManager
if (streamId) {
logger.debug('[AgentController] Completing job in GenerationJobManager');
await GenerationJobManager.completeJob(streamId);
}
// Dispose client properly
@ -434,11 +427,11 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
client = null;
getReqData = null;
userMessage = null;
getAbortData = null;
endpointOption.agent = null;
if (endpointOption) {
endpointOption.agent = null;
}
endpointOption = null;
cleanupHandlers = null;
userMessagePromise = null;
// Clear request data map
if (requestDataMap.has(req)) {
@ -460,6 +453,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
}
};
cleanupHandlers.push(removePrelimHandler);
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({
req,
@ -467,6 +461,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
endpointOption,
signal: prelimAbortController.signal,
});
if (prelimAbortController.signal?.aborted) {
prelimAbortController = null;
throw new Error('Request was aborted before initialization could complete');
@ -485,28 +480,26 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
// Store request data in WeakMap keyed by req object
requestDataMap.set(req, { client });
// Use WeakRef to allow GC but still access content if it exists
const contentRef = new WeakRef(client.contentParts || []);
// Create job in GenerationJobManager for abort handling
// Use conversationId as streamId, or generate one for new conversations
streamId =
conversationId || `nonresumable_${Date.now()}_${Math.random().toString(36).slice(2)}`;
const job = await GenerationJobManager.createJob(streamId, userId, conversationId);
// Minimize closure scope - only capture small primitives and WeakRef
getAbortData = () => {
// Dereference WeakRef each time
const content = contentRef.deref();
// Store endpoint metadata for abort handling
GenerationJobManager.updateMetadata(streamId, {
endpoint: endpointOption.endpoint,
iconURL: endpointOption.iconURL,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
sender: client?.sender,
});
return {
sender,
content: content || [],
userMessage,
promptTokens,
conversationId,
userMessagePromise,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
};
};
// Store content parts reference for abort
if (client?.contentParts) {
GenerationJobManager.setContentParts(streamId, client.contentParts);
}
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
const closeHandler = createCloseHandler(abortController);
const closeHandler = createCloseHandler(job.abortController);
res.on('close', closeHandler);
cleanupHandlers.push(() => {
try {
@ -516,6 +509,33 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
}
});
/**
* onStart callback - stores user message and response ID for abort handling
*/
const onStart = (userMsg, respMsgId, _isNewConvo) => {
sendEvent(res, { message: userMsg, created: true });
userMessage = userMsg;
userMessageId = userMsg.messageId;
responseMessageId = respMsgId;
// Update conversationId if it was a new conversation
if (!conversationId && userMsg.conversationId) {
conversationId = userMsg.conversationId;
}
// Store metadata for abort handling
GenerationJobManager.updateMetadata(streamId, {
responseMessageId: respMsgId,
conversationId: userMsg.conversationId,
userMessage: {
messageId: userMsg.messageId,
parentMessageId: userMsg.parentMessageId,
conversationId: userMsg.conversationId,
text: userMsg.text,
},
});
};
const messageOptions = {
user: userId,
onStart,
@ -525,7 +545,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
editedContent,
conversationId,
parentMessageId,
abortController,
abortController: job.abortController,
overrideParentMessageId,
isEdited: !!editedContent,
userMCPAuthMap: result.userMCPAuthMap,
@ -565,7 +585,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
}
// Only send if not aborted
if (!abortController.signal.aborted) {
if (!job.abortController.signal.aborted) {
// Create a new response object with minimal copies
const finalResponse = { ...response };
@ -639,7 +659,7 @@ const AgentController = async (req, res, next, initializeClient, addTitle) => {
// Handle error without capturing much scope
handleAbortError(res, req, error, {
conversationId,
sender,
sender: client?.sender,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId ?? parentMessageId,
userMessageId,

View file

@ -1,2 +0,0 @@
// abortControllers.js
module.exports = new Map();

View file

@ -1,124 +1,101 @@
const { logger } = require('@librechat/data-schemas');
const { countTokens, isEnabled, sendEvent, sanitizeMessageForTransmit } = require('@librechat/api');
const { isAssistantsEndpoint, ErrorTypes, Constants } = require('librechat-data-provider');
const {
countTokens,
isEnabled,
sendEvent,
GenerationJobManager,
sanitizeMessageForTransmit,
} = require('@librechat/api');
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
const clearPendingReq = require('~/cache/clearPendingReq');
const { sendError } = require('~/server/middleware/error');
const { spendTokens } = require('~/models/spendTokens');
const abortControllers = require('./abortControllers');
const { saveMessage, getConvo } = require('~/models');
const { abortRun } = require('./abortRun');
const abortDataMap = new WeakMap();
/**
* @param {string} abortKey
* @returns {boolean}
* Abort an active message generation.
* Uses GenerationJobManager for all agent requests.
*/
function cleanupAbortController(abortKey) {
if (!abortControllers.has(abortKey)) {
return false;
}
const { abortController } = abortControllers.get(abortKey);
if (!abortController) {
abortControllers.delete(abortKey);
return true;
}
// 1. Check if this controller has any composed signals and clean them up
try {
// This creates a temporary composed signal to use for cleanup
const composedSignal = AbortSignal.any([abortController.signal]);
// Get all event types - in practice, AbortSignal typically only uses 'abort'
const eventTypes = ['abort'];
// First, execute a dummy listener removal to handle potential composed signals
for (const eventType of eventTypes) {
const dummyHandler = () => {};
composedSignal.addEventListener(eventType, dummyHandler);
composedSignal.removeEventListener(eventType, dummyHandler);
const listeners = composedSignal.listeners?.(eventType) || [];
for (const listener of listeners) {
composedSignal.removeEventListener(eventType, listener);
}
}
} catch (e) {
logger.debug(`Error cleaning up composed signals: ${e}`);
}
// 2. Abort the controller if not already aborted
if (!abortController.signal.aborted) {
abortController.abort();
}
// 3. Remove from registry
abortControllers.delete(abortKey);
// 4. Clean up any data stored in the WeakMap
if (abortDataMap.has(abortController)) {
abortDataMap.delete(abortController);
}
// 5. Clean up function references on the controller
if (abortController.getAbortData) {
abortController.getAbortData = null;
}
if (abortController.abortCompletion) {
abortController.abortCompletion = null;
}
return true;
}
/**
* @param {string} abortKey
* @returns {function(): void}
*/
function createCleanUpHandler(abortKey) {
return function () {
try {
cleanupAbortController(abortKey);
} catch {
// Ignore cleanup errors
}
};
}
async function abortMessage(req, res) {
let { abortKey, endpoint } = req.body;
const { abortKey, endpoint } = req.body;
if (isAssistantsEndpoint(endpoint)) {
return await abortRun(req, res);
}
const conversationId = abortKey?.split(':')?.[0] ?? req.user.id;
const userId = req.user.id;
if (!abortControllers.has(abortKey) && abortControllers.has(conversationId)) {
abortKey = conversationId;
// Use GenerationJobManager to abort the job
const abortResult = await GenerationJobManager.abortByConversation(conversationId);
if (!abortResult.success) {
if (!res.headersSent) {
return res.status(204).send({ message: 'Request not found' });
}
return;
}
if (!abortControllers.has(abortKey) && !res.headersSent) {
return res.status(204).send({ message: 'Request not found' });
}
const { jobData, content, text } = abortResult;
const { abortController } = abortControllers.get(abortKey) ?? {};
if (!abortController) {
return res.status(204).send({ message: 'Request not found' });
}
// Count tokens and spend them
const completionTokens = await countTokens(text);
const promptTokens = jobData?.promptTokens ?? 0;
const finalEvent = await abortController.abortCompletion?.();
logger.debug(
`[abortMessage] ID: ${req.user.id} | ${req.user.email} | Aborted request: ` +
JSON.stringify({ abortKey }),
const responseMessage = {
messageId: jobData?.responseMessageId,
parentMessageId: jobData?.userMessage?.messageId,
conversationId: jobData?.conversationId,
content,
text,
sender: jobData?.sender ?? 'AI',
finish_reason: 'incomplete',
endpoint: jobData?.endpoint,
iconURL: jobData?.iconURL,
model: jobData?.model,
unfinished: false,
error: false,
isCreatedByUser: false,
tokenCount: completionTokens,
};
await spendTokens(
{ ...responseMessage, context: 'incomplete', user: userId },
{ promptTokens, completionTokens },
);
cleanupAbortController(abortKey);
if (res.headersSent && finalEvent) {
await saveMessage(
req,
{ ...responseMessage, user: userId },
{ context: 'api/server/middleware/abortMiddleware.js' },
);
// Get conversation for title
const conversation = await getConvo(userId, conversationId);
const finalEvent = {
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
final: true,
conversation,
requestMessage: jobData?.userMessage
? sanitizeMessageForTransmit({
messageId: jobData.userMessage.messageId,
parentMessageId: jobData.userMessage.parentMessageId,
conversationId: jobData.userMessage.conversationId,
text: jobData.userMessage.text,
isCreatedByUser: true,
})
: null,
responseMessage,
};
logger.debug(
`[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`,
);
if (res.headersSent) {
return sendEvent(res, finalEvent);
}
@ -139,171 +116,13 @@ const handleAbort = function () {
};
};
const createAbortController = (req, res, getAbortData, getReqData) => {
const abortController = new AbortController();
const { endpointOption } = req.body;
// Store minimal data in WeakMap to avoid circular references
abortDataMap.set(abortController, {
getAbortDataFn: getAbortData,
userId: req.user.id,
endpoint: endpointOption.endpoint,
iconURL: endpointOption.iconURL,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
});
// Replace the direct function reference with a wrapper that uses WeakMap
abortController.getAbortData = function () {
const data = abortDataMap.get(this);
if (!data || typeof data.getAbortDataFn !== 'function') {
return {};
}
try {
const result = data.getAbortDataFn();
// Create a copy without circular references
const cleanResult = { ...result };
// If userMessagePromise exists, break its reference to client
if (
cleanResult.userMessagePromise &&
typeof cleanResult.userMessagePromise.then === 'function'
) {
// Create a new promise that fulfills with the same result but doesn't reference the original
const originalPromise = cleanResult.userMessagePromise;
cleanResult.userMessagePromise = new Promise((resolve, reject) => {
originalPromise.then(
(result) => resolve({ ...result }),
(error) => reject(error),
);
});
}
return cleanResult;
} catch (err) {
logger.error('[abortController.getAbortData] Error:', err);
return {};
}
};
/**
* @param {TMessage} userMessage
* @param {string} responseMessageId
* @param {boolean} [isNewConvo]
*/
const onStart = (userMessage, responseMessageId, isNewConvo) => {
sendEvent(res, { message: userMessage, created: true });
const prelimAbortKey = userMessage?.conversationId ?? req.user.id;
const abortKey = isNewConvo
? `${prelimAbortKey}${Constants.COMMON_DIVIDER}${Constants.NEW_CONVO}`
: prelimAbortKey;
getReqData({ abortKey });
const prevRequest = abortControllers.get(abortKey);
const { overrideUserMessageId } = req?.body ?? {};
if (overrideUserMessageId != null && prevRequest && prevRequest?.abortController) {
const data = prevRequest.abortController.getAbortData();
getReqData({ userMessage: data?.userMessage });
const addedAbortKey = `${abortKey}:${responseMessageId}`;
// Store minimal options
const minimalOptions = {
endpoint: endpointOption.endpoint,
iconURL: endpointOption.iconURL,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
};
abortControllers.set(addedAbortKey, { abortController, ...minimalOptions });
const cleanupHandler = createCleanUpHandler(addedAbortKey);
res.on('finish', cleanupHandler);
return;
}
// Store minimal options
const minimalOptions = {
endpoint: endpointOption.endpoint,
iconURL: endpointOption.iconURL,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
};
abortControllers.set(abortKey, { abortController, ...minimalOptions });
const cleanupHandler = createCleanUpHandler(abortKey);
res.on('finish', cleanupHandler);
};
// Define abortCompletion without capturing the entire parent scope
abortController.abortCompletion = async function () {
this.abort();
// Get data from WeakMap
const ctrlData = abortDataMap.get(this);
if (!ctrlData || !ctrlData.getAbortDataFn) {
return { final: true, conversation: {}, title: 'New Chat' };
}
// Get abort data using stored function
const { conversationId, userMessage, userMessagePromise, promptTokens, ...responseData } =
ctrlData.getAbortDataFn();
const completionTokens = await countTokens(responseData?.text ?? '');
const user = ctrlData.userId;
const responseMessage = {
...responseData,
conversationId,
finish_reason: 'incomplete',
endpoint: ctrlData.endpoint,
iconURL: ctrlData.iconURL,
model: ctrlData.modelOptions?.model ?? ctrlData.model_parameters?.model,
unfinished: false,
error: false,
isCreatedByUser: false,
tokenCount: completionTokens,
};
await spendTokens(
{ ...responseMessage, context: 'incomplete', user },
{ promptTokens, completionTokens },
);
await saveMessage(
req,
{ ...responseMessage, user },
{ context: 'api/server/middleware/abortMiddleware.js' },
);
let conversation;
if (userMessagePromise) {
const resolved = await userMessagePromise;
conversation = resolved?.conversation;
// Break reference to promise
resolved.conversation = null;
}
if (!conversation) {
conversation = await getConvo(user, conversationId);
}
return {
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
final: true,
conversation,
requestMessage: sanitizeMessageForTransmit(userMessage),
responseMessage: responseMessage,
};
};
return { abortController, onStart };
};
/**
* Handle abort errors during generation.
* @param {ServerResponse} res
* @param {ServerRequest} req
* @param {Error | unknown} error
* @param {Partial<TMessage> & { partialText?: string }} data
* @returns { Promise<void> }
* @returns {Promise<void>}
*/
const handleAbortError = async (res, req, error, data) => {
if (error?.message?.includes('base64')) {
@ -368,8 +187,7 @@ const handleAbortError = async (res, req, error, data) => {
};
}
const callback = createCleanUpHandler(conversationId);
await sendError(req, res, options, callback);
await sendError(req, res, options);
};
if (partialText && partialText.length > 5) {
@ -387,6 +205,4 @@ const handleAbortError = async (res, req, error, data) => {
module.exports = {
handleAbort,
handleAbortError,
createAbortController,
cleanupAbortController,
};

View file

@ -5,6 +5,7 @@ import type {
IContentStateManager,
SerializableJobData,
IEventTransport,
AbortResult,
IJobStore,
} from './interfaces/IJobStore';
import type * as t from '~/types';
@ -307,14 +308,15 @@ class GenerationJobManagerClass {
/**
* Abort a job (user-initiated).
* Returns all data needed for token spending and message saving.
*/
async abortJob(streamId: string): Promise<void> {
async abortJob(streamId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJob(streamId);
const runtime = this.runtimeState.get(streamId);
if (!jobData) {
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
return;
return { success: false, jobData: null, content: [], text: '', finalEvent: null };
}
if (runtime) {
@ -326,9 +328,12 @@ class GenerationJobManagerClass {
completedAt: Date.now(),
});
// Get content and extract text
const content = this.contentState.getContentParts(streamId) ?? [];
const text = this.extractTextFromContent(content);
// Create final event for abort
const userMessageId = jobData.userMessage?.messageId;
const content = this.contentState.getContentParts(streamId) ?? [];
const abortFinalEvent: t.ServerSentEvent = {
final: true,
@ -348,6 +353,7 @@ class GenerationJobManagerClass {
parentMessageId: userMessageId,
conversationId: jobData.conversationId,
content,
text,
sender: jobData.sender ?? 'AI',
unfinished: true,
error: false,
@ -364,6 +370,44 @@ class GenerationJobManagerClass {
this.contentState.clearContentState(streamId);
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
return {
success: true,
jobData,
content,
text,
finalEvent: abortFinalEvent,
};
}
/**
* Extract plain text from content parts array.
*/
private extractTextFromContent(content: Agents.MessageContentComplex[]): string {
return content
.map((part) => {
if ('text' in part && typeof part.text === 'string') {
return part.text;
}
return '';
})
.join('')
.trim();
}
/**
* Abort a job by conversationId (for abort middleware).
* Returns abort result with all data needed for token spending and message saving.
*/
async abortByConversation(conversationId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJobByConversation(conversationId);
if (!jobData) {
logger.debug(
`[GenerationJobManager] No active job found for conversation: ${conversationId}`,
);
return { success: false, jobData: null, content: [], text: '', finalEvent: null };
}
return this.abortJob(jobData.streamId);
}
/**
@ -494,6 +538,18 @@ class GenerationJobManagerClass {
if (metadata.userMessage) {
updates.userMessage = metadata.userMessage;
}
if (metadata.endpoint) {
updates.endpoint = metadata.endpoint;
}
if (metadata.iconURL) {
updates.iconURL = metadata.iconURL;
}
if (metadata.model) {
updates.model = metadata.model;
}
if (metadata.promptTokens !== undefined) {
updates.promptTokens = metadata.promptTokens;
}
this.jobStore.updateJob(streamId, updates);
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
}

View file

@ -1 +1,2 @@
export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager';
export type { AbortResult, SerializableJobData, JobStatus } from './interfaces/IJobStore';

View file

@ -37,6 +37,29 @@ export interface SerializableJobData {
/** Serialized final event for replay */
finalEvent?: string;
/** Endpoint metadata for abort handling - avoids storing functions */
endpoint?: string;
iconURL?: string;
model?: string;
promptTokens?: number;
}
/**
* Result returned from aborting a job - contains all data needed
* for token spending and message saving without storing callbacks
*/
export interface AbortResult {
/** Whether the abort was successful */
success: boolean;
/** The job data at time of abort */
jobData: SerializableJobData | null;
/** Aggregated content from the stream */
content: Agents.MessageContentComplex[];
/** Plain text representation of content */
text: string;
/** Final event to send to client */
finalEvent: unknown;
}
/**

View file

@ -11,6 +11,14 @@ export interface GenerationJobMetadata {
responseMessageId?: string;
/** Sender label for the response (e.g., "GPT-4.1", "Claude") */
sender?: string;
/** Endpoint identifier for abort handling */
endpoint?: string;
/** Icon URL for UI display */
iconURL?: string;
/** Model name for token tracking */
model?: string;
/** Prompt token count for abort token spending */
promptTokens?: number;
}
export type GenerationJobStatus = 'running' | 'complete' | 'error' | 'aborted';