refactor: DocumentDB Compatibility for Balance Updates (#6673)

* fix: Implement optimistic concurrency control for balance updates in Transaction model to allow for documentdb compatibility

* test: Add concurrent balance increase test for auto refill transactions
This commit is contained in:
Danny Avila 2025-04-01 23:09:24 -04:00 committed by GitHub
parent 0865bc4a72
commit d8337e00d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 200 additions and 29 deletions

View file

@ -8,39 +8,135 @@ const Balance = require('./Balance');
const cancelRate = 1.15; const cancelRate = 1.15;
/** /**
* Updates a user's token balance based on a transaction. * Updates a user's token balance based on a transaction using optimistic concurrency control
* * without schema changes. Compatible with DocumentDB.
* @async * @async
* @function * @function
* @param {Object} params - The function parameters. * @param {Object} params - The function parameters.
* @param {string} params.user - The user ID. * @param {string|mongoose.Types.ObjectId} params.user - The user ID.
* @param {number} params.incrementValue - The value to increment the balance by (can be negative). * @param {number} params.incrementValue - The value to increment the balance by (can be negative).
* @param {import('mongoose').UpdateQuery<import('@librechat/data-schemas').IBalance>['$set']} params.setValues * @param {import('mongoose').UpdateQuery<import('@librechat/data-schemas').IBalance>['$set']} [params.setValues] - Optional additional fields to set.
* @returns {Promise<Object>} Returns the updated balance response. * @returns {Promise<Object>} Returns the updated balance document (lean).
* @throws {Error} Throws an error if the update fails after multiple retries.
*/ */
const updateBalance = async ({ user, incrementValue, setValues }) => { const updateBalance = async ({ user, incrementValue, setValues }) => {
// Use findOneAndUpdate with a conditional update to make the balance update atomic let maxRetries = 10; // Number of times to retry on conflict
// This prevents race conditions when multiple transactions are processed concurrently let delay = 50; // Initial retry delay in ms
const balanceResponse = await Balance.findOneAndUpdate( let lastError = null;
{ user },
[ for (let attempt = 1; attempt <= maxRetries; attempt++) {
{ let currentBalanceDoc;
try {
// 1. Read the current document state
currentBalanceDoc = await Balance.findOne({ user }).lean();
const currentCredits = currentBalanceDoc ? currentBalanceDoc.tokenCredits : 0;
// 2. Calculate the desired new state
const potentialNewCredits = currentCredits + incrementValue;
const newCredits = Math.max(0, potentialNewCredits); // Ensure balance doesn't go below zero
// 3. Prepare the update payload
const updatePayload = {
$set: { $set: {
tokenCredits: { tokenCredits: newCredits,
$cond: { ...(setValues || {}), // Merge other values to set
if: { $lt: [{ $add: ['$tokenCredits', incrementValue] }, 0] },
then: 0,
else: { $add: ['$tokenCredits', incrementValue] },
}, },
};
// 4. Attempt the conditional update or upsert
let updatedBalance = null;
if (currentBalanceDoc) {
// --- Document Exists: Perform Conditional Update ---
// Try to update only if the tokenCredits match the value we read (currentCredits)
updatedBalance = await Balance.findOneAndUpdate(
{
user: user,
tokenCredits: currentCredits, // Optimistic lock: condition based on the read value
}, },
...setValues, updatePayload,
{
new: true, // Return the modified document
// lean: true, // .lean() is applied after query execution in Mongoose >= 6
}, },
).lean(); // Use lean() for plain JS object
if (updatedBalance) {
// Success! The update was applied based on the expected current state.
return updatedBalance;
}
// If updatedBalance is null, it means tokenCredits changed between read and write (conflict).
lastError = new Error(`Concurrency conflict for user ${user} on attempt ${attempt}.`);
// Proceed to retry logic below.
} else {
// --- Document Does Not Exist: Perform Conditional Upsert ---
// Try to insert the document, but only if it still doesn't exist.
// Using tokenCredits: {$exists: false} helps prevent race conditions where
// another process creates the doc between our findOne and findOneAndUpdate.
try {
updatedBalance = await Balance.findOneAndUpdate(
{
user: user,
// Attempt to match only if the document doesn't exist OR was just created
// without tokenCredits (less likely but possible). A simple { user } filter
// might also work, relying on the retry for conflicts.
// Let's use a simpler filter and rely on retry for races.
// tokenCredits: { $exists: false } // This condition might be too strict if doc exists with 0 credits
},
updatePayload,
{
upsert: true, // Create if doesn't exist
new: true, // Return the created/updated document
// setDefaultsOnInsert: true, // Ensure schema defaults are applied on insert
// lean: true,
}, },
],
{ upsert: true, new: true },
).lean(); ).lean();
return balanceResponse; if (updatedBalance) {
// Upsert succeeded (likely created the document)
return updatedBalance;
}
// If null, potentially a rare race condition during upsert. Retry should handle it.
lastError = new Error(
`Upsert race condition suspected for user ${user} on attempt ${attempt}.`,
);
} catch (error) {
if (error.code === 11000) {
// E11000 duplicate key error on index
// This means another process created the document *just* before our upsert.
// It's a concurrency conflict during creation. We should retry.
lastError = error; // Store the error
// Proceed to retry logic below.
} else {
// Different error, rethrow
throw error;
}
}
} // End if/else (document exists?)
} catch (error) {
// Catch errors from findOne or unexpected findOneAndUpdate errors
logger.error(`[updateBalance] Error during attempt ${attempt} for user ${user}:`, error);
lastError = error; // Store the error
// Consider stopping retries for non-transient errors, but for now, we retry.
}
// If we reached here, it means the update failed (conflict or error), wait and retry
if (attempt < maxRetries) {
const jitter = Math.random() * delay * 0.5; // Add jitter to delay
await new Promise((resolve) => setTimeout(resolve, delay + jitter));
delay = Math.min(delay * 2, 2000); // Exponential backoff with cap
}
} // End for loop (retries)
// If loop finishes without success, throw the last encountered error or a generic one
logger.error(
`[updateBalance] Failed to update balance for user ${user} after ${maxRetries} attempts.`,
);
throw (
lastError ||
new Error(
`Failed to update balance for user ${user} after maximum retries due to persistent conflicts.`,
)
);
}; };
/** Method to calculate and set the tokenValue for a transaction */ /** Method to calculate and set the tokenValue for a transaction */

View file

@ -459,7 +459,7 @@ describe('spendTokens', () => {
it('should handle multiple concurrent transactions correctly with a high balance', async () => { it('should handle multiple concurrent transactions correctly with a high balance', async () => {
// Create a balance with a high amount // Create a balance with a high amount
const initialBalance = 1000000; const initialBalance = 10000000;
await Balance.create({ await Balance.create({
user: userId, user: userId,
tokenCredits: initialBalance, tokenCredits: initialBalance,
@ -470,8 +470,9 @@ describe('spendTokens', () => {
const context = 'message'; const context = 'message';
const model = 'gpt-4'; const model = 'gpt-4';
// Create 10 usage records to simulate multiple transactions const amount = 50;
const collectedUsage = Array.from({ length: 10 }, (_, i) => ({ // Create `amount` of usage records to simulate multiple transactions
const collectedUsage = Array.from({ length: amount }, (_, i) => ({
model, model,
input_tokens: 100 + i * 10, // Increasing input tokens input_tokens: 100 + i * 10, // Increasing input tokens
output_tokens: 50 + i * 5, // Increasing output tokens output_tokens: 50 + i * 5, // Increasing output tokens
@ -591,6 +592,80 @@ describe('spendTokens', () => {
expect(Math.abs(totalTokenValue)).toBeCloseTo(actualSpend, -3); // Allow for larger differences expect(Math.abs(totalTokenValue)).toBeCloseTo(actualSpend, -3); // Allow for larger differences
}); });
// Add this new test case
it('should handle multiple concurrent balance increases correctly', async () => {
// Start with zero balance
const initialBalance = 0;
await Balance.create({
user: userId,
tokenCredits: initialBalance,
});
const numberOfRefills = 25;
const refillAmount = 1000;
const promises = [];
for (let i = 0; i < numberOfRefills; i++) {
promises.push(
Transaction.createAutoRefillTransaction({
user: userId,
tokenType: 'credits',
context: 'concurrent-refill-test',
rawAmount: refillAmount,
}),
);
}
// Wait for all refill transactions to complete
const results = await Promise.all(promises);
// Verify final balance
const finalBalance = await Balance.findOne({ user: userId });
expect(finalBalance).toBeDefined();
// The final balance should be the initial balance plus the sum of all refills
const expectedFinalBalance = initialBalance + numberOfRefills * refillAmount;
console.log('Initial balance (Increase Test):', initialBalance);
console.log(`Performed ${numberOfRefills} refills of ${refillAmount} each.`);
console.log('Expected final balance (Increase Test):', expectedFinalBalance);
console.log('Actual final balance (Increase Test):', finalBalance.tokenCredits);
// Use toBeCloseTo for safety, though toBe should work for integer math
expect(finalBalance.tokenCredits).toBeCloseTo(expectedFinalBalance, 0);
// Verify all transactions were created
const transactions = await Transaction.find({
user: userId,
context: 'concurrent-refill-test',
});
// We should have one transaction for each refill attempt
expect(transactions.length).toBe(numberOfRefills);
// Optional: Verify the sum of increments from the results matches the balance change
const totalIncrementReported = results.reduce((sum, result) => {
// Assuming createAutoRefillTransaction returns an object with the increment amount
// Adjust this based on the actual return structure.
// Let's assume it returns { balance: newBalance, transaction: { rawAmount: ... } }
// Or perhaps we check the transaction.rawAmount directly
return sum + (result?.transaction?.rawAmount || 0);
}, 0);
console.log('Total increment reported by results:', totalIncrementReported);
expect(totalIncrementReported).toBe(expectedFinalBalance - initialBalance);
// Optional: Check the sum of tokenValue from saved transactions
let totalTokenValueFromDb = 0;
transactions.forEach((tx) => {
// For refills, rawAmount is positive, and tokenValue might be calculated based on it
// Let's assume tokenValue directly reflects the increment for simplicity here
// If calculation is involved, adjust accordingly
totalTokenValueFromDb += tx.rawAmount; // Or tx.tokenValue if that holds the increment
});
console.log('Total rawAmount from DB transactions:', totalTokenValueFromDb);
expect(totalTokenValueFromDb).toBeCloseTo(expectedFinalBalance - initialBalance, 0);
});
it('should create structured transactions for both prompt and completion tokens', async () => { it('should create structured transactions for both prompt and completion tokens', async () => {
// Create a balance for the user // Create a balance for the user
await Balance.create({ await Balance.create({