LibreChat/api/server/controllers/agents/openai.js
Danny Avila e1e204d6cf
🧮 refactor: Bulk Transactions & Balance Updates for Token Spending (#11996)
* refactor: transaction handling by integrating pricing and bulk write operations

- Updated `recordCollectedUsage` to accept pricing functions and bulk write operations, improving transaction management.
- Refactored `AgentClient` and related controllers to utilize the new transaction handling capabilities, ensuring better performance and accuracy in token spending.
- Added tests to validate the new functionality, ensuring correct behavior for both standard and bulk transaction paths.
- Introduced a new `transactions.ts` file to encapsulate transaction-related logic and types, enhancing code organization and maintainability.

* chore: reorganize imports in agents client controller

- Moved `getMultiplier` and `getCacheMultiplier` imports to maintain consistency and clarity in the import structure.
- Removed duplicate import of `updateBalance` and `bulkInsertTransactions`, streamlining the code for better readability.

* refactor: add TransactionData type and CANCEL_RATE constant to data-schemas

Establishes a single source of truth for the transaction document shape
and the incomplete-context billing rate constant, both consumed by
packages/api and api/.

* refactor: use proper types in data-schemas transaction methods

- Replace `as unknown as { tokenCredits }` with `lean<IBalance>()`
- Use `TransactionData[]` instead of `Record<string, unknown>[]`
  for bulkInsertTransactions parameter
- Add JSDoc noting insertMany bypasses document middleware
- Remove orphan section comment in methods/index.ts

* refactor: use shared types in transactions.ts, fix bulk write logic

- Import CANCEL_RATE from data-schemas instead of local duplicate
- Import TransactionData from data-schemas for PreparedEntry/BulkWriteDeps
- Use tilde alias for EndpointTokenConfig import
- Pass valueKey through to getMultiplier
- Only sum tokenValue for balance-enabled docs in bulkWriteTransactions
- Consolidate two loops into single-pass map

* refactor: remove duplicate updateBalance from Transaction.js

Import updateBalance from ~/models (sourced from data-schemas) instead
of maintaining a second copy. Also import CANCEL_RATE from data-schemas
and remove the Balance model import (no longer needed directly).

* fix: test real spendCollectedUsage instead of IIFE replica

Export spendCollectedUsage from abortMiddleware.js and rewrite the test
file to import and test the actual function. Previously the tests ran
against a hand-written replica that could silently diverge from the real
implementation.

* test: add transactions.spec.ts and restore regression comments

Add 22 direct unit tests for transactions.ts financial logic covering
prepareTokenSpend, prepareStructuredTokenSpend, bulkWriteTransactions,
CANCEL_RATE paths, NaN guards, disabled transactions, zero tokens,
cache multipliers, and balance-enabled filtering.

Restore critical regression documentation comments in
recordCollectedUsage.spec.js explaining which production bugs the
tests guard against.

* fix: widen setValues type to include lastRefill

The UpdateBalanceParams.setValues type was Partial<Pick<IBalance,
'tokenCredits'>> which excluded lastRefill — used by
createAutoRefillTransaction. Widen to also pick 'lastRefill'.

* test: use real MongoDB for bulkWriteTransactions tests

Replace mock-based bulkWriteTransactions tests with real DB tests using
MongoMemoryServer. Pure function tests (prepareTokenSpend,
prepareStructuredTokenSpend) remain mock-based since they don't touch
DB. Add end-to-end integration tests that verify the full prepare →
bulk write → DB state pipeline with real Transaction and Balance models.

* chore: update @librechat/agents dependency to version 3.1.54 in package-lock.json and related package.json files

* test: add bulk path parity tests proving identical DB outcomes

Three test suites proving the bulk path (prepareTokenSpend/
prepareStructuredTokenSpend + bulkWriteTransactions) produces
numerically identical results to the legacy path for all scenarios:

- usage.bulk-parity.spec.ts: mirrors all legacy recordCollectedUsage
  tests; asserts same return values and verifies metadata fields on
  the insertMany docs match what spendTokens args would carry

- transactions.bulk-parity.spec.ts: real-DB tests using actual
  getMultiplier/getCacheMultiplier pricing functions; asserts exact
  tokenValue, rate, rawAmount and balance deductions for standard
  tokens, structured/cache tokens, CANCEL_RATE, premium pricing,
  multi-entry batches, and edge cases (NaN, zero, disabled)

- Transaction.spec.js: adds describe('Bulk path parity') that mirrors
  7 key legacy tests via recordCollectedUsage + bulk deps against
  real MongoDB, asserting same balance deductions and doc counts

* refactor: update llmConfig structure to use modelKwargs for reasoning effort

Refactor the llmConfig in getOpenAILLMConfig to store reasoning effort within modelKwargs instead of directly on llmConfig. This change ensures consistency in the configuration structure and improves clarity in the handling of reasoning properties in the tests.

* test: update performance checks in processAssistantMessage tests

Revise the performance assertions in the processAssistantMessage tests to ensure that each message processing time remains under 100ms, addressing potential ReDoS vulnerabilities. This change enhances the reliability of the tests by focusing on maximum processing time rather than relative ratios.

* test: fill parity test gaps — model fallback, abort context, structured edge cases

- usage.bulk-parity: add undefined model fallback test
- transactions.bulk-parity: add abort context test (txns inserted,
  balance unchanged when balance not passed), fix readTokens type cast
- Transaction.spec: add 3 missing mirrors — balance disabled with
  transactions enabled, structured transactions disabled, structured
  balance disabled

* fix: deduct balance before inserting transactions to prevent orphaned docs

Swap the order in bulkWriteTransactions: updateBalance runs before
insertMany. If updateBalance fails (after exhausting retries), no
transaction documents are written — avoiding the inconsistent state
where transactions exist in MongoDB with no corresponding balance
deduction.

* chore: import order

* test: update config.spec.ts for OpenRouter reasoning in modelKwargs

Same fix as llm.spec.ts — OpenRouter reasoning is now passed via
modelKwargs instead of llmConfig.reasoning directly.
2026-03-01 12:26:36 -05:00

713 lines
21 KiB
JavaScript

const { nanoid } = require('nanoid');
const { logger } = require('@librechat/data-schemas');
const { Callback, ToolEndHandler, formatAgentMessages } = require('@librechat/agents');
const { EModelEndpoint, ResourceType, PermissionBits } = require('librechat-data-provider');
const {
writeSSE,
createRun,
createChunk,
buildToolSet,
sendFinalChunk,
createSafeUser,
validateRequest,
initializeAgent,
getBalanceConfig,
createErrorResponse,
recordCollectedUsage,
getTransactionsConfig,
createToolExecuteHandler,
buildNonStreamingResponse,
createOpenAIStreamTracker,
createOpenAIContentAggregator,
isChatCompletionValidationFailure,
} = require('@librechat/api');
const { loadAgentTools, loadToolsForExecution } = require('~/server/services/ToolService');
const { createToolEndCallback } = require('~/server/controllers/agents/callbacks');
const { findAccessibleResources } = require('~/server/services/PermissionService');
const { spendTokens, spendStructuredTokens } = require('~/models/spendTokens');
const { getMultiplier, getCacheMultiplier } = require('~/models/tx');
const { getConvoFiles } = require('~/models/Conversation');
const { getAgent, getAgents } = require('~/models/Agent');
const db = require('~/models');
/**
* Creates a tool loader function for the agent.
* @param {AbortSignal} signal - The abort signal
* @param {boolean} [definitionsOnly=true] - When true, returns only serializable
* tool definitions without creating full tool instances (for event-driven mode)
*/
function createToolLoader(signal, definitionsOnly = true) {
return async function loadTools({
req,
res,
tools,
model,
agentId,
provider,
tool_options,
tool_resources,
}) {
const agent = { id: agentId, tools, provider, model, tool_options };
try {
return await loadAgentTools({
req,
res,
agent,
signal,
tool_resources,
definitionsOnly,
streamId: null, // No resumable stream for OpenAI compat
});
} catch (error) {
logger.error('Error loading tools for agent ' + agentId, error);
}
};
}
/**
* Convert content part to internal format
* @param {Object} part - Content part
* @returns {Object} Converted part
*/
function convertContentPart(part) {
if (part.type === 'text') {
return { type: 'text', text: part.text };
}
if (part.type === 'image_url') {
return { type: 'image_url', image_url: part.image_url };
}
return part;
}
/**
* Convert OpenAI messages to internal format
* @param {Array} messages - OpenAI format messages
* @returns {Array} Internal format messages
*/
function convertMessages(messages) {
return messages.map((msg) => {
let content;
if (typeof msg.content === 'string') {
content = msg.content;
} else if (msg.content) {
content = msg.content.map(convertContentPart);
} else {
content = '';
}
return {
role: msg.role,
content,
...(msg.name && { name: msg.name }),
...(msg.tool_calls && { tool_calls: msg.tool_calls }),
...(msg.tool_call_id && { tool_call_id: msg.tool_call_id }),
};
});
}
/**
* Send an error response in OpenAI format
*/
function sendErrorResponse(res, statusCode, message, type = 'invalid_request_error', code = null) {
res.status(statusCode).json(createErrorResponse(message, type, code));
}
/**
* OpenAI-compatible chat completions controller for agents.
*
* POST /v1/chat/completions
*
* Request format:
* {
* "model": "agent_id_here",
* "messages": [{"role": "user", "content": "Hello!"}],
* "stream": true,
* "conversation_id": "optional",
* "parent_message_id": "optional"
* }
*/
const OpenAIChatCompletionController = async (req, res) => {
const appConfig = req.config;
const requestStartTime = Date.now();
const validation = validateRequest(req.body);
if (isChatCompletionValidationFailure(validation)) {
return sendErrorResponse(res, 400, validation.error);
}
const request = validation.request;
const agentId = request.model;
// Look up the agent
const agent = await getAgent({ id: agentId });
if (!agent) {
return sendErrorResponse(
res,
404,
`Agent not found: ${agentId}`,
'invalid_request_error',
'model_not_found',
);
}
const responseId = `chatcmpl-${nanoid()}`;
const conversationId = request.conversation_id ?? nanoid();
const parentMessageId = request.parent_message_id ?? null;
const created = Math.floor(Date.now() / 1000);
/** @type {import('@librechat/api').OpenAIResponseContext} — key must be `requestId` to match the type used by createChunk/buildNonStreamingResponse */
const context = {
created,
requestId: responseId,
model: agentId,
};
logger.debug(
`[OpenAI API] Response ${responseId} started for agent ${agentId}, stream: ${request.stream}`,
);
// Set up abort controller
const abortController = new AbortController();
// Handle client disconnect
req.on('close', () => {
if (!abortController.signal.aborted) {
abortController.abort();
logger.debug('[OpenAI API] Client disconnected, aborting');
}
});
try {
// Build allowed providers set
const allowedProviders = new Set(
appConfig?.endpoints?.[EModelEndpoint.agents]?.allowedProviders,
);
// Create tool loader
const loadTools = createToolLoader(abortController.signal);
// Initialize the agent first to check for disableStreaming
const endpointOption = {
endpoint: agent.provider,
model_parameters: agent.model_parameters ?? {},
};
const primaryConfig = await initializeAgent(
{
req,
res,
loadTools,
requestFiles: [],
conversationId,
parentMessageId,
agent,
endpointOption,
allowedProviders,
isInitialAgent: true,
},
{
getConvoFiles,
getFiles: db.getFiles,
getUserKey: db.getUserKey,
getMessages: db.getMessages,
updateFilesUsage: db.updateFilesUsage,
getUserKeyValues: db.getUserKeyValues,
getUserCodeFiles: db.getUserCodeFiles,
getToolFilesByIds: db.getToolFilesByIds,
getCodeGeneratedFiles: db.getCodeGeneratedFiles,
},
);
// Determine if streaming is enabled (check both request and agent config)
const streamingDisabled = !!primaryConfig.model_parameters?.disableStreaming;
const isStreaming = request.stream === true && !streamingDisabled;
// Create tracker for streaming or aggregator for non-streaming
const tracker = isStreaming ? createOpenAIStreamTracker() : null;
const aggregator = isStreaming ? null : createOpenAIContentAggregator();
// Set up response for streaming
if (isStreaming) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders();
// Send initial chunk with role
const initialChunk = createChunk(context, { role: 'assistant' });
writeSSE(res, initialChunk);
}
// Create handler config for OpenAI streaming (only used when streaming)
const handlerConfig = isStreaming
? {
res,
context,
tracker,
}
: null;
const collectedUsage = [];
/** @type {Promise<import('librechat-data-provider').TAttachment | null>[]} */
const artifactPromises = [];
const toolEndCallback = createToolEndCallback({ req, res, artifactPromises, streamId: null });
const toolExecuteOptions = {
loadTools: async (toolNames) => {
return loadToolsForExecution({
req,
res,
agent,
toolNames,
signal: abortController.signal,
toolRegistry: primaryConfig.toolRegistry,
userMCPAuthMap: primaryConfig.userMCPAuthMap,
tool_resources: primaryConfig.tool_resources,
});
},
toolEndCallback,
};
const openaiMessages = convertMessages(request.messages);
const toolSet = buildToolSet(primaryConfig);
const { messages: formattedMessages, indexTokenCountMap } = formatAgentMessages(
openaiMessages,
{},
toolSet,
);
/**
* Create a simple handler that processes data
*/
const createHandler = (processor) => ({
handle: (_event, data) => {
if (processor) {
processor(data);
}
},
});
/**
* Stream text content in OpenAI format
*/
const streamText = (text) => {
if (!text) {
return;
}
if (isStreaming) {
tracker.addText();
writeSSE(res, createChunk(context, { content: text }));
} else {
aggregator.addText(text);
}
};
/**
* Stream reasoning content in OpenAI format (OpenRouter convention)
*/
const streamReasoning = (text) => {
if (!text) {
return;
}
if (isStreaming) {
tracker.addReasoning();
writeSSE(res, createChunk(context, { reasoning: text }));
} else {
aggregator.addReasoning(text);
}
};
// Event handlers for OpenAI-compatible streaming
const handlers = {
// Text content streaming
on_message_delta: createHandler((data) => {
const content = data?.delta?.content;
if (Array.isArray(content)) {
for (const part of content) {
if (part.type === 'text' && part.text) {
streamText(part.text);
}
}
}
}),
// Reasoning/thinking content streaming
on_reasoning_delta: createHandler((data) => {
const content = data?.delta?.content;
if (Array.isArray(content)) {
for (const part of content) {
const text = part.think || part.text;
if (text) {
streamReasoning(text);
}
}
}
}),
// Tool call initiation - streams id and name (from on_run_step)
on_run_step: createHandler((data) => {
const stepDetails = data?.stepDetails;
if (stepDetails?.type === 'tool_calls' && stepDetails.tool_calls) {
for (const tc of stepDetails.tool_calls) {
const toolIndex = data.index ?? 0;
const toolId = tc.id ?? '';
const toolName = tc.name ?? '';
const toolCall = {
id: toolId,
type: 'function',
function: { name: toolName, arguments: '' },
};
// Track tool call in tracker or aggregator
if (isStreaming) {
if (!tracker.toolCalls.has(toolIndex)) {
tracker.toolCalls.set(toolIndex, toolCall);
}
// Stream initial tool call chunk (like OpenAI does)
writeSSE(
res,
createChunk(context, {
tool_calls: [{ index: toolIndex, ...toolCall }],
}),
);
} else {
if (!aggregator.toolCalls.has(toolIndex)) {
aggregator.toolCalls.set(toolIndex, toolCall);
}
}
}
}
}),
// Tool call argument streaming (from on_run_step_delta)
on_run_step_delta: createHandler((data) => {
const delta = data?.delta;
if (delta?.type === 'tool_calls' && delta.tool_calls) {
for (const tc of delta.tool_calls) {
const args = tc.args ?? '';
if (!args) {
continue;
}
const toolIndex = tc.index ?? 0;
// Update tool call arguments
const targetMap = isStreaming ? tracker.toolCalls : aggregator.toolCalls;
const tracked = targetMap.get(toolIndex);
if (tracked) {
tracked.function.arguments += args;
}
// Stream argument delta (only for streaming)
if (isStreaming) {
writeSSE(
res,
createChunk(context, {
tool_calls: [
{
index: toolIndex,
function: { arguments: args },
},
],
}),
);
}
}
}
}),
// Usage tracking
on_chat_model_end: createHandler((data) => {
const usage = data?.output?.usage_metadata;
if (usage) {
collectedUsage.push(usage);
const target = isStreaming ? tracker : aggregator;
target.usage.promptTokens += usage.input_tokens ?? 0;
target.usage.completionTokens += usage.output_tokens ?? 0;
}
}),
on_run_step_completed: createHandler(),
// Use proper ToolEndHandler for processing artifacts (images, file citations, code output)
on_tool_end: new ToolEndHandler(toolEndCallback, logger),
on_chain_stream: createHandler(),
on_chain_end: createHandler(),
on_agent_update: createHandler(),
on_custom_event: createHandler(),
// Event-driven tool execution handler
on_tool_execute: createToolExecuteHandler(toolExecuteOptions),
};
// Create and run the agent
const userId = req.user?.id ?? 'api-user';
// Extract userMCPAuthMap from primaryConfig (needed for MCP tool connections)
const userMCPAuthMap = primaryConfig.userMCPAuthMap;
const run = await createRun({
agents: [primaryConfig],
messages: formattedMessages,
indexTokenCountMap,
runId: responseId,
signal: abortController.signal,
customHandlers: handlers,
requestBody: {
messageId: responseId,
conversationId,
},
user: { id: userId },
});
if (!run) {
throw new Error('Failed to create agent run');
}
// Process the stream
const config = {
runName: 'AgentRun',
configurable: {
thread_id: conversationId,
user_id: userId,
user: createSafeUser(req.user),
requestBody: {
messageId: responseId,
conversationId,
},
...(userMCPAuthMap != null && { userMCPAuthMap }),
},
signal: abortController.signal,
streamMode: 'values',
version: 'v2',
};
await run.processStream({ messages: formattedMessages }, config, {
callbacks: {
[Callback.TOOL_ERROR]: (graph, error, toolId) => {
logger.error(`[OpenAI API] Tool Error "${toolId}"`, error);
},
},
});
// Record token usage against balance
const balanceConfig = getBalanceConfig(appConfig);
const transactionsConfig = getTransactionsConfig(appConfig);
recordCollectedUsage(
{
spendTokens,
spendStructuredTokens,
pricing: { getMultiplier, getCacheMultiplier },
bulkWriteOps: { insertMany: db.bulkInsertTransactions, updateBalance: db.updateBalance },
},
{
user: userId,
conversationId,
collectedUsage,
context: 'message',
messageId: responseId,
balance: balanceConfig,
transactions: transactionsConfig,
model: primaryConfig.model || agent.model_parameters?.model,
},
).catch((err) => {
logger.error('[OpenAI API] Error recording usage:', err);
});
// Finalize response
const duration = Date.now() - requestStartTime;
if (isStreaming) {
sendFinalChunk(handlerConfig);
res.end();
logger.debug(`[OpenAI API] Response ${responseId} completed in ${duration}ms (streaming)`);
// Wait for artifact processing after response ends (non-blocking)
if (artifactPromises.length > 0) {
Promise.all(artifactPromises).catch((artifactError) => {
logger.warn('[OpenAI API] Error processing artifacts:', artifactError);
});
}
} else {
// For non-streaming, wait for artifacts before sending response
if (artifactPromises.length > 0) {
try {
await Promise.all(artifactPromises);
} catch (artifactError) {
logger.warn('[OpenAI API] Error processing artifacts:', artifactError);
}
}
// Build usage from aggregated data
const usage = {
prompt_tokens: aggregator.usage.promptTokens,
completion_tokens: aggregator.usage.completionTokens,
total_tokens: aggregator.usage.promptTokens + aggregator.usage.completionTokens,
};
if (aggregator.usage.reasoningTokens > 0) {
usage.completion_tokens_details = {
reasoning_tokens: aggregator.usage.reasoningTokens,
};
}
const response = buildNonStreamingResponse(
context,
aggregator.getText(),
aggregator.getReasoning(),
aggregator.toolCalls,
usage,
);
res.json(response);
logger.debug(
`[OpenAI API] Response ${responseId} completed in ${duration}ms (non-streaming)`,
);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'An error occurred';
logger.error('[OpenAI API] Error:', error);
// Check if we already started streaming (headers sent)
if (res.headersSent) {
// Headers already sent, send error in stream
const errorChunk = createChunk(context, { content: `\n\nError: ${errorMessage}` }, 'stop');
writeSSE(res, errorChunk);
writeSSE(res, '[DONE]');
res.end();
} else {
// Forward upstream provider status codes (e.g., Anthropic 400s) instead of masking as 500
const statusCode =
typeof error?.status === 'number' && error.status >= 400 && error.status < 600
? error.status
: 500;
const errorType =
statusCode >= 400 && statusCode < 500 ? 'invalid_request_error' : 'server_error';
sendErrorResponse(res, statusCode, errorMessage, errorType);
}
}
};
/**
* List available agents as models (filtered by remote access permissions)
*
* GET /v1/models
*/
const ListModelsController = async (req, res) => {
try {
const userId = req.user?.id;
const userRole = req.user?.role;
if (!userId) {
return sendErrorResponse(res, 401, 'Authentication required', 'auth_error');
}
// Find agents the user has remote access to (VIEW permission on REMOTE_AGENT)
const accessibleAgentIds = await findAccessibleResources({
userId,
role: userRole,
resourceType: ResourceType.REMOTE_AGENT,
requiredPermissions: PermissionBits.VIEW,
});
// Get the accessible agents
let agents = [];
if (accessibleAgentIds.length > 0) {
agents = await getAgents({ _id: { $in: accessibleAgentIds } });
}
const models = agents.map((agent) => ({
id: agent.id,
object: 'model',
created: Math.floor(new Date(agent.createdAt || Date.now()).getTime() / 1000),
owned_by: 'librechat',
permission: [],
root: agent.id,
parent: null,
// LibreChat extensions
name: agent.name,
description: agent.description,
provider: agent.provider,
}));
res.json({
object: 'list',
data: models,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Failed to list models';
logger.error('[OpenAI API] Error listing models:', error);
sendErrorResponse(res, 500, errorMessage, 'server_error');
}
};
/**
* Get a specific model/agent (with remote access permission check)
*
* GET /v1/models/:model
*/
const GetModelController = async (req, res) => {
try {
const { model } = req.params;
const userId = req.user?.id;
const userRole = req.user?.role;
if (!userId) {
return sendErrorResponse(res, 401, 'Authentication required', 'auth_error');
}
const agent = await getAgent({ id: model });
if (!agent) {
return sendErrorResponse(
res,
404,
`Model not found: ${model}`,
'invalid_request_error',
'model_not_found',
);
}
// Check if user has remote access to this agent
const accessibleAgentIds = await findAccessibleResources({
userId,
role: userRole,
resourceType: ResourceType.REMOTE_AGENT,
requiredPermissions: PermissionBits.VIEW,
});
const hasAccess = accessibleAgentIds.some((id) => id.toString() === agent._id.toString());
if (!hasAccess) {
return sendErrorResponse(
res,
403,
`No remote access to model: ${model}`,
'permission_error',
'access_denied',
);
}
res.json({
id: agent.id,
object: 'model',
created: Math.floor(new Date(agent.createdAt || Date.now()).getTime() / 1000),
owned_by: 'librechat',
permission: [],
root: agent.id,
parent: null,
// LibreChat extensions
name: agent.name,
description: agent.description,
provider: agent.provider,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Failed to get model';
logger.error('[OpenAI API] Error getting model:', error);
sendErrorResponse(res, 500, errorMessage, 'server_error');
}
};
module.exports = {
OpenAIChatCompletionController,
ListModelsController,
GetModelController,
};