🧮 refactor: Bulk Transactions & Balance Updates for Token Spending (#11996)

* refactor: transaction handling by integrating pricing and bulk write operations

- Updated `recordCollectedUsage` to accept pricing functions and bulk write operations, improving transaction management.
- Refactored `AgentClient` and related controllers to utilize the new transaction handling capabilities, ensuring better performance and accuracy in token spending.
- Added tests to validate the new functionality, ensuring correct behavior for both standard and bulk transaction paths.
- Introduced a new `transactions.ts` file to encapsulate transaction-related logic and types, enhancing code organization and maintainability.

* chore: reorganize imports in agents client controller

- Moved `getMultiplier` and `getCacheMultiplier` imports to maintain consistency and clarity in the import structure.
- Removed duplicate import of `updateBalance` and `bulkInsertTransactions`, streamlining the code for better readability.

* refactor: add TransactionData type and CANCEL_RATE constant to data-schemas

Establishes a single source of truth for the transaction document shape
and the incomplete-context billing rate constant, both consumed by
packages/api and api/.

* refactor: use proper types in data-schemas transaction methods

- Replace `as unknown as { tokenCredits }` with `lean<IBalance>()`
- Use `TransactionData[]` instead of `Record<string, unknown>[]`
  for bulkInsertTransactions parameter
- Add JSDoc noting insertMany bypasses document middleware
- Remove orphan section comment in methods/index.ts

* refactor: use shared types in transactions.ts, fix bulk write logic

- Import CANCEL_RATE from data-schemas instead of local duplicate
- Import TransactionData from data-schemas for PreparedEntry/BulkWriteDeps
- Use tilde alias for EndpointTokenConfig import
- Pass valueKey through to getMultiplier
- Only sum tokenValue for balance-enabled docs in bulkWriteTransactions
- Consolidate two loops into single-pass map

* refactor: remove duplicate updateBalance from Transaction.js

Import updateBalance from ~/models (sourced from data-schemas) instead
of maintaining a second copy. Also import CANCEL_RATE from data-schemas
and remove the Balance model import (no longer needed directly).

* fix: test real spendCollectedUsage instead of IIFE replica

Export spendCollectedUsage from abortMiddleware.js and rewrite the test
file to import and test the actual function. Previously the tests ran
against a hand-written replica that could silently diverge from the real
implementation.

* test: add transactions.spec.ts and restore regression comments

Add 22 direct unit tests for transactions.ts financial logic covering
prepareTokenSpend, prepareStructuredTokenSpend, bulkWriteTransactions,
CANCEL_RATE paths, NaN guards, disabled transactions, zero tokens,
cache multipliers, and balance-enabled filtering.

Restore critical regression documentation comments in
recordCollectedUsage.spec.js explaining which production bugs the
tests guard against.

* fix: widen setValues type to include lastRefill

The UpdateBalanceParams.setValues type was Partial<Pick<IBalance,
'tokenCredits'>> which excluded lastRefill — used by
createAutoRefillTransaction. Widen to also pick 'lastRefill'.

* test: use real MongoDB for bulkWriteTransactions tests

Replace mock-based bulkWriteTransactions tests with real DB tests using
MongoMemoryServer. Pure function tests (prepareTokenSpend,
prepareStructuredTokenSpend) remain mock-based since they don't touch
DB. Add end-to-end integration tests that verify the full prepare →
bulk write → DB state pipeline with real Transaction and Balance models.

* chore: update @librechat/agents dependency to version 3.1.54 in package-lock.json and related package.json files

* test: add bulk path parity tests proving identical DB outcomes

Three test suites proving the bulk path (prepareTokenSpend/
prepareStructuredTokenSpend + bulkWriteTransactions) produces
numerically identical results to the legacy path for all scenarios:

- usage.bulk-parity.spec.ts: mirrors all legacy recordCollectedUsage
  tests; asserts same return values and verifies metadata fields on
  the insertMany docs match what spendTokens args would carry

- transactions.bulk-parity.spec.ts: real-DB tests using actual
  getMultiplier/getCacheMultiplier pricing functions; asserts exact
  tokenValue, rate, rawAmount and balance deductions for standard
  tokens, structured/cache tokens, CANCEL_RATE, premium pricing,
  multi-entry batches, and edge cases (NaN, zero, disabled)

- Transaction.spec.js: adds describe('Bulk path parity') that mirrors
  7 key legacy tests via recordCollectedUsage + bulk deps against
  real MongoDB, asserting same balance deductions and doc counts

* refactor: update llmConfig structure to use modelKwargs for reasoning effort

Refactor the llmConfig in getOpenAILLMConfig to store reasoning effort within modelKwargs instead of directly on llmConfig. This change ensures consistency in the configuration structure and improves clarity in the handling of reasoning properties in the tests.

* test: update performance checks in processAssistantMessage tests

Revise the performance assertions in the processAssistantMessage tests to ensure that each message processing time remains under 100ms, addressing potential ReDoS vulnerabilities. This change enhances the reliability of the tests by focusing on maximum processing time rather than relative ratios.

* test: fill parity test gaps — model fallback, abort context, structured edge cases

- usage.bulk-parity: add undefined model fallback test
- transactions.bulk-parity: add abort context test (txns inserted,
  balance unchanged when balance not passed), fix readTokens type cast
- Transaction.spec: add 3 missing mirrors — balance disabled with
  transactions enabled, structured transactions disabled, structured
  balance disabled

* fix: deduct balance before inserting transactions to prevent orphaned docs

Swap the order in bulkWriteTransactions: updateBalance runs before
insertMany. If updateBalance fails (after exhausting retries), no
transaction documents are written — avoiding the inconsistent state
where transactions exist in MongoDB with no corresponding balance
deduction.

* chore: import order

* test: update config.spec.ts for OpenRouter reasoning in modelKwargs

Same fix as llm.spec.ts — OpenRouter reasoning is now passed via
modelKwargs instead of llmConfig.reasoning directly.
This commit is contained in:
Danny Avila 2026-03-01 12:26:36 -05:00 committed by GitHub
parent 0e5ee379b3
commit e1e204d6cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 3004 additions and 1070 deletions

View file

@ -1,17 +1,19 @@
const { logger } = require('@librechat/data-schemas');
const {
countTokens,
isEnabled,
sendEvent,
countTokens,
GenerationJobManager,
recordCollectedUsage,
sanitizeMessageForTransmit,
} = require('@librechat/api');
const { isAssistantsEndpoint, ErrorTypes } = require('librechat-data-provider');
const { saveMessage, getConvo, updateBalance, bulkInsertTransactions } = require('~/models');
const { spendTokens, spendStructuredTokens } = require('~/models/spendTokens');
const { truncateText, smartTruncateText } = require('~/app/clients/prompts');
const { getMultiplier, getCacheMultiplier } = require('~/models/tx');
const clearPendingReq = require('~/cache/clearPendingReq');
const { sendError } = require('~/server/middleware/error');
const { saveMessage, getConvo } = require('~/models');
const { abortRun } = require('./abortRun');
/**
@ -40,57 +42,22 @@ async function spendCollectedUsage({
return;
}
const spendPromises = [];
for (const usage of collectedUsage) {
if (!usage) {
continue;
}
// Support both OpenAI format (input_token_details) and Anthropic format (cache_*_input_tokens)
const cache_creation =
Number(usage.input_token_details?.cache_creation) ||
Number(usage.cache_creation_input_tokens) ||
0;
const cache_read =
Number(usage.input_token_details?.cache_read) || Number(usage.cache_read_input_tokens) || 0;
const txMetadata = {
await recordCollectedUsage(
{
spendTokens,
spendStructuredTokens,
pricing: { getMultiplier, getCacheMultiplier },
bulkWriteOps: { insertMany: bulkInsertTransactions, updateBalance },
},
{
user: userId,
conversationId,
collectedUsage,
context: 'abort',
messageId,
conversationId,
user: userId,
model: usage.model ?? fallbackModel,
};
if (cache_creation > 0 || cache_read > 0) {
spendPromises.push(
spendStructuredTokens(txMetadata, {
promptTokens: {
input: usage.input_tokens,
write: cache_creation,
read: cache_read,
},
completionTokens: usage.output_tokens,
}).catch((err) => {
logger.error('[abortMiddleware] Error spending structured tokens for abort', err);
}),
);
continue;
}
spendPromises.push(
spendTokens(txMetadata, {
promptTokens: usage.input_tokens,
completionTokens: usage.output_tokens,
}).catch((err) => {
logger.error('[abortMiddleware] Error spending tokens for abort', err);
}),
);
}
// Wait for all token spending to complete
await Promise.all(spendPromises);
model: fallbackModel,
},
);
// Clear the array to prevent double-spending from the AgentClient finally block.
// The collectedUsage array is shared by reference with AgentClient.collectedUsage,
@ -301,4 +268,5 @@ const handleAbortError = async (res, req, error, data) => {
module.exports = {
handleAbort,
handleAbortError,
spendCollectedUsage,
};

View file

@ -4,16 +4,32 @@
* This tests the token spending logic for abort scenarios,
* particularly for parallel agents (addedConvo) where multiple
* models need their tokens spent.
*
* spendCollectedUsage delegates to recordCollectedUsage from @librechat/api,
* passing pricing + bulkWriteOps deps, with context: 'abort'.
* After spending, it clears the collectedUsage array to prevent double-spending
* from the AgentClient finally block (which shares the same array reference).
*/
const mockSpendTokens = jest.fn().mockResolvedValue();
const mockSpendStructuredTokens = jest.fn().mockResolvedValue();
const mockRecordCollectedUsage = jest
.fn()
.mockResolvedValue({ input_tokens: 100, output_tokens: 50 });
const mockGetMultiplier = jest.fn().mockReturnValue(1);
const mockGetCacheMultiplier = jest.fn().mockReturnValue(null);
jest.mock('~/models/spendTokens', () => ({
spendTokens: (...args) => mockSpendTokens(...args),
spendStructuredTokens: (...args) => mockSpendStructuredTokens(...args),
}));
jest.mock('~/models/tx', () => ({
getMultiplier: mockGetMultiplier,
getCacheMultiplier: mockGetCacheMultiplier,
}));
jest.mock('@librechat/data-schemas', () => ({
logger: {
debug: jest.fn(),
@ -30,6 +46,7 @@ jest.mock('@librechat/api', () => ({
GenerationJobManager: {
abortJob: jest.fn(),
},
recordCollectedUsage: mockRecordCollectedUsage,
sanitizeMessageForTransmit: jest.fn((msg) => msg),
}));
@ -49,94 +66,27 @@ jest.mock('~/server/middleware/error', () => ({
sendError: jest.fn(),
}));
const mockUpdateBalance = jest.fn().mockResolvedValue({});
const mockBulkInsertTransactions = jest.fn().mockResolvedValue(undefined);
jest.mock('~/models', () => ({
saveMessage: jest.fn().mockResolvedValue(),
getConvo: jest.fn().mockResolvedValue({ title: 'Test Chat' }),
updateBalance: mockUpdateBalance,
bulkInsertTransactions: mockBulkInsertTransactions,
}));
jest.mock('./abortRun', () => ({
abortRun: jest.fn(),
}));
// Import the module after mocks are set up
// We need to extract the spendCollectedUsage function for testing
// Since it's not exported, we'll test it through the handleAbort flow
const { spendCollectedUsage } = require('./abortMiddleware');
describe('abortMiddleware - spendCollectedUsage', () => {
beforeEach(() => {
jest.clearAllMocks();
});
describe('spendCollectedUsage logic', () => {
// Since spendCollectedUsage is not exported, we test the logic directly
// by replicating the function here for unit testing
const spendCollectedUsage = async ({
userId,
conversationId,
collectedUsage,
fallbackModel,
}) => {
if (!collectedUsage || collectedUsage.length === 0) {
return;
}
const spendPromises = [];
for (const usage of collectedUsage) {
if (!usage) {
continue;
}
const cache_creation =
Number(usage.input_token_details?.cache_creation) ||
Number(usage.cache_creation_input_tokens) ||
0;
const cache_read =
Number(usage.input_token_details?.cache_read) ||
Number(usage.cache_read_input_tokens) ||
0;
const txMetadata = {
context: 'abort',
conversationId,
user: userId,
model: usage.model ?? fallbackModel,
};
if (cache_creation > 0 || cache_read > 0) {
spendPromises.push(
mockSpendStructuredTokens(txMetadata, {
promptTokens: {
input: usage.input_tokens,
write: cache_creation,
read: cache_read,
},
completionTokens: usage.output_tokens,
}).catch(() => {
// Log error but don't throw
}),
);
continue;
}
spendPromises.push(
mockSpendTokens(txMetadata, {
promptTokens: usage.input_tokens,
completionTokens: usage.output_tokens,
}).catch(() => {
// Log error but don't throw
}),
);
}
// Wait for all token spending to complete
await Promise.all(spendPromises);
// Clear the array to prevent double-spending
collectedUsage.length = 0;
};
describe('spendCollectedUsage delegation', () => {
it('should return early if collectedUsage is empty', async () => {
await spendCollectedUsage({
userId: 'user-123',
@ -145,8 +95,7 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).not.toHaveBeenCalled();
expect(mockSpendStructuredTokens).not.toHaveBeenCalled();
expect(mockRecordCollectedUsage).not.toHaveBeenCalled();
});
it('should return early if collectedUsage is null', async () => {
@ -157,28 +106,10 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).not.toHaveBeenCalled();
expect(mockSpendStructuredTokens).not.toHaveBeenCalled();
expect(mockRecordCollectedUsage).not.toHaveBeenCalled();
});
it('should skip null entries in collectedUsage', async () => {
const collectedUsage = [
{ input_tokens: 100, output_tokens: 50, model: 'gpt-4' },
null,
{ input_tokens: 200, output_tokens: 60, model: 'gpt-4' },
];
await spendCollectedUsage({
userId: 'user-123',
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(2);
});
it('should spend tokens for single model', async () => {
it('should call recordCollectedUsage with abort context and full deps', async () => {
const collectedUsage = [{ input_tokens: 100, output_tokens: 50, model: 'gpt-4' }];
await spendCollectedUsage({
@ -186,21 +117,35 @@ describe('abortMiddleware - spendCollectedUsage', () => {
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'gpt-4',
messageId: 'msg-123',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(1);
expect(mockSpendTokens).toHaveBeenCalledWith(
expect.objectContaining({
context: 'abort',
conversationId: 'convo-123',
expect(mockRecordCollectedUsage).toHaveBeenCalledTimes(1);
expect(mockRecordCollectedUsage).toHaveBeenCalledWith(
{
spendTokens: expect.any(Function),
spendStructuredTokens: expect.any(Function),
pricing: {
getMultiplier: mockGetMultiplier,
getCacheMultiplier: mockGetCacheMultiplier,
},
bulkWriteOps: {
insertMany: mockBulkInsertTransactions,
updateBalance: mockUpdateBalance,
},
},
{
user: 'user-123',
conversationId: 'convo-123',
collectedUsage,
context: 'abort',
messageId: 'msg-123',
model: 'gpt-4',
}),
{ promptTokens: 100, completionTokens: 50 },
},
);
});
it('should spend tokens for multiple models (parallel agents)', async () => {
it('should pass context abort for multiple models (parallel agents)', async () => {
const collectedUsage = [
{ input_tokens: 100, output_tokens: 50, model: 'gpt-4' },
{ input_tokens: 80, output_tokens: 40, model: 'claude-3' },
@ -214,136 +159,17 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(3);
// Verify each model was called
expect(mockSpendTokens).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ model: 'gpt-4' }),
{ promptTokens: 100, completionTokens: 50 },
);
expect(mockSpendTokens).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ model: 'claude-3' }),
{ promptTokens: 80, completionTokens: 40 },
);
expect(mockSpendTokens).toHaveBeenNthCalledWith(
3,
expect.objectContaining({ model: 'gemini-pro' }),
{ promptTokens: 120, completionTokens: 60 },
);
});
it('should use fallbackModel when usage.model is missing', async () => {
const collectedUsage = [{ input_tokens: 100, output_tokens: 50 }];
await spendCollectedUsage({
userId: 'user-123',
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'fallback-model',
});
expect(mockSpendTokens).toHaveBeenCalledWith(
expect.objectContaining({ model: 'fallback-model' }),
expect(mockRecordCollectedUsage).toHaveBeenCalledTimes(1);
expect(mockRecordCollectedUsage).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
context: 'abort',
collectedUsage,
}),
);
});
it('should use spendStructuredTokens for OpenAI format cache tokens', async () => {
const collectedUsage = [
{
input_tokens: 100,
output_tokens: 50,
model: 'gpt-4',
input_token_details: {
cache_creation: 20,
cache_read: 10,
},
},
];
await spendCollectedUsage({
userId: 'user-123',
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'gpt-4',
});
expect(mockSpendStructuredTokens).toHaveBeenCalledTimes(1);
expect(mockSpendTokens).not.toHaveBeenCalled();
expect(mockSpendStructuredTokens).toHaveBeenCalledWith(
expect.objectContaining({ model: 'gpt-4', context: 'abort' }),
{
promptTokens: {
input: 100,
write: 20,
read: 10,
},
completionTokens: 50,
},
);
});
it('should use spendStructuredTokens for Anthropic format cache tokens', async () => {
const collectedUsage = [
{
input_tokens: 100,
output_tokens: 50,
model: 'claude-3',
cache_creation_input_tokens: 25,
cache_read_input_tokens: 15,
},
];
await spendCollectedUsage({
userId: 'user-123',
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'claude-3',
});
expect(mockSpendStructuredTokens).toHaveBeenCalledTimes(1);
expect(mockSpendTokens).not.toHaveBeenCalled();
expect(mockSpendStructuredTokens).toHaveBeenCalledWith(
expect.objectContaining({ model: 'claude-3' }),
{
promptTokens: {
input: 100,
write: 25,
read: 15,
},
completionTokens: 50,
},
);
});
it('should handle mixed cache and non-cache entries', async () => {
const collectedUsage = [
{ input_tokens: 100, output_tokens: 50, model: 'gpt-4' },
{
input_tokens: 150,
output_tokens: 30,
model: 'claude-3',
cache_creation_input_tokens: 20,
cache_read_input_tokens: 10,
},
{ input_tokens: 200, output_tokens: 20, model: 'gemini-pro' },
];
await spendCollectedUsage({
userId: 'user-123',
conversationId: 'convo-123',
collectedUsage,
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(2);
expect(mockSpendStructuredTokens).toHaveBeenCalledTimes(1);
});
it('should handle real-world parallel agent abort scenario', async () => {
// Simulates: Primary agent (gemini) + addedConvo agent (gpt-5) aborted mid-stream
const collectedUsage = [
{ input_tokens: 31596, output_tokens: 151, model: 'gemini-3-flash-preview' },
{ input_tokens: 28000, output_tokens: 120, model: 'gpt-5.2' },
@ -356,27 +182,24 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gemini-3-flash-preview',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(2);
// Primary model
expect(mockSpendTokens).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ model: 'gemini-3-flash-preview' }),
{ promptTokens: 31596, completionTokens: 151 },
);
// Parallel model (addedConvo)
expect(mockSpendTokens).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ model: 'gpt-5.2' }),
{ promptTokens: 28000, completionTokens: 120 },
expect(mockRecordCollectedUsage).toHaveBeenCalledTimes(1);
expect(mockRecordCollectedUsage).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({
user: 'user-123',
conversationId: 'convo-123',
context: 'abort',
model: 'gemini-3-flash-preview',
}),
);
});
/**
* Race condition prevention: after abort middleware spends tokens,
* the collectedUsage array is cleared so AgentClient.recordCollectedUsage()
* (which shares the same array reference) sees an empty array and returns early.
*/
it('should clear collectedUsage array after spending to prevent double-spending', async () => {
// This tests the race condition fix: after abort middleware spends tokens,
// the collectedUsage array is cleared so AgentClient.recordCollectedUsage()
// (which shares the same array reference) sees an empty array and returns early.
const collectedUsage = [
{ input_tokens: 100, output_tokens: 50, model: 'gpt-4' },
{ input_tokens: 80, output_tokens: 40, model: 'claude-3' },
@ -391,19 +214,16 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gpt-4',
});
expect(mockSpendTokens).toHaveBeenCalledTimes(2);
// The array should be cleared after spending
expect(mockRecordCollectedUsage).toHaveBeenCalledTimes(1);
expect(collectedUsage.length).toBe(0);
});
it('should await all token spending operations before clearing array', async () => {
// Ensure we don't clear the array before spending completes
let spendCallCount = 0;
mockSpendTokens.mockImplementation(async () => {
spendCallCount++;
// Simulate async delay
it('should await recordCollectedUsage before clearing array', async () => {
let resolved = false;
mockRecordCollectedUsage.mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 10));
resolved = true;
return { input_tokens: 100, output_tokens: 50 };
});
const collectedUsage = [
@ -418,10 +238,7 @@ describe('abortMiddleware - spendCollectedUsage', () => {
fallbackModel: 'gpt-4',
});
// Both spend calls should have completed
expect(spendCallCount).toBe(2);
// Array should be cleared after awaiting
expect(resolved).toBe(true);
expect(collectedUsage.length).toBe(0);
});
});