LibreChat/api/server/middleware/abortMiddleware.js
Danny Avila a6fb257bcf
📦 refactor: Consolidate DB models, encapsulating Mongoose usage in data-schemas (#11830)
* chore: move database model methods to /packages/data-schemas

* chore: add TypeScript ESLint rule to warn on unused variables

* refactor: model imports to streamline access

- Consolidated model imports across various files to improve code organization and reduce redundancy.
- Updated imports for models such as Assistant, Message, Conversation, and others to a unified import path.
- Adjusted middleware and service files to reflect the new import structure, ensuring functionality remains intact.
- Enhanced test files to align with the new import paths, maintaining test coverage and integrity.

* chore: migrate database models to packages/data-schemas and refactor all direct Mongoose Model usage outside of data-schemas

* test: update agent model mocks in unit tests

- Added `getAgent` mock to `client.test.js` to enhance test coverage for agent-related functionality.
- Removed redundant `getAgent` and `getAgents` mocks from `openai.spec.js` and `responses.unit.spec.js` to streamline test setup and reduce duplication.
- Ensured consistency in agent mock implementations across test files.

* fix: update types in data-schemas

* refactor: enhance type definitions in transaction and spending methods

- Updated type definitions in `checkBalance.ts` to use specific request and response types.
- Refined `spendTokens.ts` to utilize a new `SpendTxData` interface for better clarity and type safety.
- Improved transaction handling in `transaction.ts` by introducing `TransactionResult` and `TxData` interfaces, ensuring consistent data structures across methods.
- Adjusted unit tests in `transaction.spec.ts` to accommodate new type definitions and enhance robustness.

* refactor: streamline model imports and enhance code organization

- Consolidated model imports across various controllers and services to a unified import path, improving code clarity and reducing redundancy.
- Updated multiple files to reflect the new import structure, ensuring all functionalities remain intact.
- Enhanced overall code organization by removing duplicate import statements and optimizing the usage of model methods.

* feat: implement loadAddedAgent and refactor agent loading logic

- Introduced `loadAddedAgent` function to handle loading agents from added conversations, supporting multi-convo parallel execution.
- Created a new `load.ts` file to encapsulate agent loading functionalities, including `loadEphemeralAgent` and `loadAgent`.
- Updated the `index.ts` file to export the new `load` module instead of the deprecated `loadAgent`.
- Enhanced type definitions and improved error handling in the agent loading process.
- Adjusted unit tests to reflect changes in the agent loading structure and ensure comprehensive coverage.

* refactor: enhance balance handling with new update interface

- Introduced `IBalanceUpdate` interface to streamline balance update operations across the codebase.
- Updated `upsertBalanceFields` method signatures in `balance.ts`, `transaction.ts`, and related tests to utilize the new interface for improved type safety.
- Adjusted type imports in `balance.spec.ts` to include `IBalanceUpdate`, ensuring consistency in balance management functionalities.
- Enhanced overall code clarity and maintainability by refining type definitions related to balance operations.

* feat: add unit tests for loadAgent functionality and enhance agent loading logic

- Introduced comprehensive unit tests for the `loadAgent` function, covering various scenarios including null and empty agent IDs, loading of ephemeral agents, and permission checks.
- Enhanced the `initializeClient` function by moving `getConvoFiles` to the correct position in the database method exports, ensuring proper functionality.
- Improved test coverage for agent loading, including handling of non-existent agents and user permissions.

* chore: reorder memory method exports for consistency

- Moved `deleteAllUserMemories` to the correct position in the exported memory methods, ensuring a consistent and logical order of method exports in `memory.ts`.
2026-02-18 00:31:36 -05:00

298 lines
8.9 KiB
JavaScript

const { logger } = require('@librechat/data-schemas');
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
const {
isEnabled,
sendEvent,
countTokens,
GenerationJobManager,
sanitizeMessageForTransmit,
} = require('@librechat/api');
const { spendTokens, spendStructuredTokens, saveMessage, getConvo } = require('~/models');
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
const clearPendingReq = require('~/cache/clearPendingReq');
const { sendError } = require('~/server/middleware/error');
const { abortRun } = require('./abortRun');
/**
* Spend tokens for all models from collected usage.
* This handles both sequential and parallel agent execution.
*
* IMPORTANT: After spending, this function clears the collectedUsage array
* to prevent double-spending. The array is shared with AgentClient.collectedUsage,
* so clearing it here prevents the finally block from also spending tokens.
*
* @param {Object} params
* @param {string} params.userId - User ID
* @param {string} params.conversationId - Conversation ID
* @param {Array<Object>} params.collectedUsage - Usage metadata from all models
* @param {string} [params.fallbackModel] - Fallback model name if not in usage
*/
async function spendCollectedUsage({ userId, conversationId, collectedUsage, fallbackModel }) {
if (!collectedUsage || collectedUsage.length === 0) {
return;
}
const spendPromises = [];
for (const usage of collectedUsage) {
if (!usage) {
continue;
}
// Support both OpenAI format (input_token_details) and Anthropic format (cache_*_input_tokens)
const cache_creation =
Number(usage.input_token_details?.cache_creation) ||
Number(usage.cache_creation_input_tokens) ||
0;
const cache_read =
Number(usage.input_token_details?.cache_read) || Number(usage.cache_read_input_tokens) || 0;
const txMetadata = {
context: 'abort',
conversationId,
user: userId,
model: usage.model ?? fallbackModel,
};
if (cache_creation > 0 || cache_read > 0) {
spendPromises.push(
spendStructuredTokens(txMetadata, {
promptTokens: {
input: usage.input_tokens,
write: cache_creation,
read: cache_read,
},
completionTokens: usage.output_tokens,
}).catch((err) => {
logger.error('[abortMiddleware] Error spending structured tokens for abort', err);
}),
);
continue;
}
spendPromises.push(
spendTokens(txMetadata, {
promptTokens: usage.input_tokens,
completionTokens: usage.output_tokens,
}).catch((err) => {
logger.error('[abortMiddleware] Error spending tokens for abort', err);
}),
);
}
// Wait for all token spending to complete
await Promise.all(spendPromises);
// Clear the array to prevent double-spending from the AgentClient finally block.
// The collectedUsage array is shared by reference with AgentClient.collectedUsage,
// so clearing it here ensures recordCollectedUsage() sees an empty array and returns early.
collectedUsage.length = 0;
}
/**
* Abort an active message generation.
* Uses GenerationJobManager for all agent requests.
* Since streamId === conversationId, we can directly abort by conversationId.
*/
async function abortMessage(req, res) {
const { abortKey, endpoint } = req.body;
if (isAssistantsEndpoint(endpoint)) {
return await abortRun(req, res);
}
const conversationId = abortKey?.split(':')?.[0] ?? req.user.id;
const userId = req.user.id;
// Use GenerationJobManager to abort the job (streamId === conversationId)
const abortResult = await GenerationJobManager.abortJob(conversationId);
if (!abortResult.success) {
if (!res.headersSent) {
return res.status(204).send({ message: 'Request not found' });
}
return;
}
const { jobData, content, text, collectedUsage } = abortResult;
const completionTokens = await countTokens(text);
const promptTokens = jobData?.promptTokens ?? 0;
const responseMessage = {
messageId: jobData?.responseMessageId,
parentMessageId: jobData?.userMessage?.messageId,
conversationId: jobData?.conversationId,
content,
text,
sender: jobData?.sender ?? 'AI',
finish_reason: 'incomplete',
endpoint: jobData?.endpoint,
iconURL: jobData?.iconURL,
model: jobData?.model,
unfinished: false,
error: false,
isCreatedByUser: false,
tokenCount: completionTokens,
};
// Spend tokens for ALL models from collectedUsage (handles parallel agents/addedConvo)
if (collectedUsage && collectedUsage.length > 0) {
await spendCollectedUsage({
userId,
conversationId: jobData?.conversationId,
collectedUsage,
fallbackModel: jobData?.model,
});
} else {
// Fallback: no collected usage, use text-based token counting for primary model only
await spendTokens(
{ ...responseMessage, context: 'incomplete', user: userId },
{ promptTokens, completionTokens },
);
}
await saveMessage(
{
userId: req?.user?.id,
isTemporary: req?.body?.isTemporary,
interfaceConfig: req?.config?.interfaceConfig,
},
{ ...responseMessage, user: userId },
{ context: 'api/server/middleware/abortMiddleware.js' },
);
// Get conversation for title
const conversation = await getConvo(userId, conversationId);
const finalEvent = {
title: conversation && !conversation.title ? null : conversation?.title || 'New Chat',
final: true,
conversation,
requestMessage: jobData?.userMessage
? sanitizeMessageForTransmit({
messageId: jobData.userMessage.messageId,
parentMessageId: jobData.userMessage.parentMessageId,
conversationId: jobData.userMessage.conversationId,
text: jobData.userMessage.text,
isCreatedByUser: true,
})
: null,
responseMessage,
};
logger.debug(
`[abortMessage] ID: ${userId} | ${req.user.email} | Aborted request: ${conversationId}`,
);
if (res.headersSent) {
return sendEvent(res, finalEvent);
}
res.setHeader('Content-Type', 'application/json');
res.send(JSON.stringify(finalEvent));
}
const handleAbort = function () {
return async function (req, res) {
try {
if (isEnabled(process.env.LIMIT_CONCURRENT_MESSAGES)) {
await clearPendingReq({ userId: req.user.id });
}
return await abortMessage(req, res);
} catch (err) {
logger.error('[abortMessage] handleAbort error', err);
}
};
};
/**
* Handle abort errors during generation.
* @param {ServerResponse} res
* @param {ServerRequest} req
* @param {Error | unknown} error
* @param {Partial<TMessage> & { partialText?: string }} data
* @returns {Promise<void>}
*/
const handleAbortError = async (res, req, error, data) => {
if (error?.message?.includes('base64')) {
logger.error('[handleAbortError] Error in base64 encoding', {
...error,
stack: smartTruncateText(error?.stack, 1000),
message: truncateText(error.message, 350),
});
} else {
logger.error('[handleAbortError] AI response error; aborting request:', error);
}
const { sender, conversationId, messageId, parentMessageId, userMessageId, partialText } = data;
if (error.stack && error.stack.includes('google')) {
logger.warn(
`AI Response error for conversation ${conversationId} likely caused by Google censor/filter`,
);
}
let errorText = error?.message?.includes('"type"')
? error.message
: 'An error occurred while processing your request. Please contact the Admin.';
if (error?.type === ErrorTypes.INVALID_REQUEST) {
errorText = `{"type":"${ErrorTypes.INVALID_REQUEST}"}`;
}
if (error?.message?.includes("does not support 'system'")) {
errorText = `{"type":"${ErrorTypes.NO_SYSTEM_MESSAGES}"}`;
}
/**
* @param {string} partialText
* @returns {Promise<void>}
*/
const respondWithError = async (partialText) => {
const endpointOption = req.body?.endpointOption;
let options = {
sender,
messageId,
conversationId,
parentMessageId,
text: errorText,
user: req.user.id,
spec: endpointOption?.spec,
iconURL: endpointOption?.iconURL,
modelLabel: endpointOption?.modelLabel,
shouldSaveMessage: userMessageId != null,
model: endpointOption?.modelOptions?.model || req.body?.model,
};
if (req.body?.agent_id) {
options.agent_id = req.body.agent_id;
}
if (partialText) {
options = {
...options,
error: false,
unfinished: true,
text: partialText,
};
}
await sendError(req, res, options);
};
if (partialText && partialText.length > 5) {
try {
return await abortMessage(req, res);
} catch (err) {
logger.error('[handleAbortError] error while trying to abort message', err);
return respondWithError(partialText);
}
} else {
return respondWithError();
}
};
module.exports = {
handleAbort,
handleAbortError,
};