diff --git a/api/models/Transaction.js b/api/models/Transaction.js index f68b311315..e171241b61 100644 --- a/api/models/Transaction.js +++ b/api/models/Transaction.js @@ -8,39 +8,135 @@ const Balance = require('./Balance'); const cancelRate = 1.15; /** - * Updates a user's token balance based on a transaction. - * + * Updates a user's token balance based on a transaction using optimistic concurrency control + * without schema changes. Compatible with DocumentDB. * @async * @function * @param {Object} params - The function parameters. - * @param {string} params.user - The user ID. + * @param {string|mongoose.Types.ObjectId} params.user - The user ID. * @param {number} params.incrementValue - The value to increment the balance by (can be negative). - * @param {import('mongoose').UpdateQuery['$set']} params.setValues - * @returns {Promise} Returns the updated balance response. + * @param {import('mongoose').UpdateQuery['$set']} [params.setValues] - Optional additional fields to set. + * @returns {Promise} Returns the updated balance document (lean). + * @throws {Error} Throws an error if the update fails after multiple retries. */ const updateBalance = async ({ user, incrementValue, setValues }) => { - // Use findOneAndUpdate with a conditional update to make the balance update atomic - // This prevents race conditions when multiple transactions are processed concurrently - const balanceResponse = await Balance.findOneAndUpdate( - { user }, - [ - { - $set: { - tokenCredits: { - $cond: { - if: { $lt: [{ $add: ['$tokenCredits', incrementValue] }, 0] }, - then: 0, - else: { $add: ['$tokenCredits', incrementValue] }, - }, - }, - ...setValues, - }, - }, - ], - { upsert: true, new: true }, - ).lean(); + let maxRetries = 10; // Number of times to retry on conflict + let delay = 50; // Initial retry delay in ms + let lastError = null; - return balanceResponse; + for (let attempt = 1; attempt <= maxRetries; attempt++) { + let currentBalanceDoc; + try { + // 1. Read the current document state + currentBalanceDoc = await Balance.findOne({ user }).lean(); + const currentCredits = currentBalanceDoc ? currentBalanceDoc.tokenCredits : 0; + + // 2. Calculate the desired new state + const potentialNewCredits = currentCredits + incrementValue; + const newCredits = Math.max(0, potentialNewCredits); // Ensure balance doesn't go below zero + + // 3. Prepare the update payload + const updatePayload = { + $set: { + tokenCredits: newCredits, + ...(setValues || {}), // Merge other values to set + }, + }; + + // 4. Attempt the conditional update or upsert + let updatedBalance = null; + if (currentBalanceDoc) { + // --- Document Exists: Perform Conditional Update --- + // Try to update only if the tokenCredits match the value we read (currentCredits) + updatedBalance = await Balance.findOneAndUpdate( + { + user: user, + tokenCredits: currentCredits, // Optimistic lock: condition based on the read value + }, + updatePayload, + { + new: true, // Return the modified document + // lean: true, // .lean() is applied after query execution in Mongoose >= 6 + }, + ).lean(); // Use lean() for plain JS object + + if (updatedBalance) { + // Success! The update was applied based on the expected current state. + return updatedBalance; + } + // If updatedBalance is null, it means tokenCredits changed between read and write (conflict). + lastError = new Error(`Concurrency conflict for user ${user} on attempt ${attempt}.`); + // Proceed to retry logic below. + } else { + // --- Document Does Not Exist: Perform Conditional Upsert --- + // Try to insert the document, but only if it still doesn't exist. + // Using tokenCredits: {$exists: false} helps prevent race conditions where + // another process creates the doc between our findOne and findOneAndUpdate. + try { + updatedBalance = await Balance.findOneAndUpdate( + { + user: user, + // Attempt to match only if the document doesn't exist OR was just created + // without tokenCredits (less likely but possible). A simple { user } filter + // might also work, relying on the retry for conflicts. + // Let's use a simpler filter and rely on retry for races. + // tokenCredits: { $exists: false } // This condition might be too strict if doc exists with 0 credits + }, + updatePayload, + { + upsert: true, // Create if doesn't exist + new: true, // Return the created/updated document + // setDefaultsOnInsert: true, // Ensure schema defaults are applied on insert + // lean: true, + }, + ).lean(); + + if (updatedBalance) { + // Upsert succeeded (likely created the document) + return updatedBalance; + } + // If null, potentially a rare race condition during upsert. Retry should handle it. + lastError = new Error( + `Upsert race condition suspected for user ${user} on attempt ${attempt}.`, + ); + } catch (error) { + if (error.code === 11000) { + // E11000 duplicate key error on index + // This means another process created the document *just* before our upsert. + // It's a concurrency conflict during creation. We should retry. + lastError = error; // Store the error + // Proceed to retry logic below. + } else { + // Different error, rethrow + throw error; + } + } + } // End if/else (document exists?) + } catch (error) { + // Catch errors from findOne or unexpected findOneAndUpdate errors + logger.error(`[updateBalance] Error during attempt ${attempt} for user ${user}:`, error); + lastError = error; // Store the error + // Consider stopping retries for non-transient errors, but for now, we retry. + } + + // If we reached here, it means the update failed (conflict or error), wait and retry + if (attempt < maxRetries) { + const jitter = Math.random() * delay * 0.5; // Add jitter to delay + await new Promise((resolve) => setTimeout(resolve, delay + jitter)); + delay = Math.min(delay * 2, 2000); // Exponential backoff with cap + } + } // End for loop (retries) + + // If loop finishes without success, throw the last encountered error or a generic one + logger.error( + `[updateBalance] Failed to update balance for user ${user} after ${maxRetries} attempts.`, + ); + throw ( + lastError || + new Error( + `Failed to update balance for user ${user} after maximum retries due to persistent conflicts.`, + ) + ); }; /** Method to calculate and set the tokenValue for a transaction */ diff --git a/api/models/spendTokens.spec.js b/api/models/spendTokens.spec.js index 09da9a46b2..eacf420330 100644 --- a/api/models/spendTokens.spec.js +++ b/api/models/spendTokens.spec.js @@ -459,7 +459,7 @@ describe('spendTokens', () => { it('should handle multiple concurrent transactions correctly with a high balance', async () => { // Create a balance with a high amount - const initialBalance = 1000000; + const initialBalance = 10000000; await Balance.create({ user: userId, tokenCredits: initialBalance, @@ -470,8 +470,9 @@ describe('spendTokens', () => { const context = 'message'; const model = 'gpt-4'; - // Create 10 usage records to simulate multiple transactions - const collectedUsage = Array.from({ length: 10 }, (_, i) => ({ + const amount = 50; + // Create `amount` of usage records to simulate multiple transactions + const collectedUsage = Array.from({ length: amount }, (_, i) => ({ model, input_tokens: 100 + i * 10, // Increasing input tokens output_tokens: 50 + i * 5, // Increasing output tokens @@ -591,6 +592,80 @@ describe('spendTokens', () => { expect(Math.abs(totalTokenValue)).toBeCloseTo(actualSpend, -3); // Allow for larger differences }); + // Add this new test case + it('should handle multiple concurrent balance increases correctly', async () => { + // Start with zero balance + const initialBalance = 0; + await Balance.create({ + user: userId, + tokenCredits: initialBalance, + }); + + const numberOfRefills = 25; + const refillAmount = 1000; + + const promises = []; + for (let i = 0; i < numberOfRefills; i++) { + promises.push( + Transaction.createAutoRefillTransaction({ + user: userId, + tokenType: 'credits', + context: 'concurrent-refill-test', + rawAmount: refillAmount, + }), + ); + } + + // Wait for all refill transactions to complete + const results = await Promise.all(promises); + + // Verify final balance + const finalBalance = await Balance.findOne({ user: userId }); + expect(finalBalance).toBeDefined(); + + // The final balance should be the initial balance plus the sum of all refills + const expectedFinalBalance = initialBalance + numberOfRefills * refillAmount; + + console.log('Initial balance (Increase Test):', initialBalance); + console.log(`Performed ${numberOfRefills} refills of ${refillAmount} each.`); + console.log('Expected final balance (Increase Test):', expectedFinalBalance); + console.log('Actual final balance (Increase Test):', finalBalance.tokenCredits); + + // Use toBeCloseTo for safety, though toBe should work for integer math + expect(finalBalance.tokenCredits).toBeCloseTo(expectedFinalBalance, 0); + + // Verify all transactions were created + const transactions = await Transaction.find({ + user: userId, + context: 'concurrent-refill-test', + }); + + // We should have one transaction for each refill attempt + expect(transactions.length).toBe(numberOfRefills); + + // Optional: Verify the sum of increments from the results matches the balance change + const totalIncrementReported = results.reduce((sum, result) => { + // Assuming createAutoRefillTransaction returns an object with the increment amount + // Adjust this based on the actual return structure. + // Let's assume it returns { balance: newBalance, transaction: { rawAmount: ... } } + // Or perhaps we check the transaction.rawAmount directly + return sum + (result?.transaction?.rawAmount || 0); + }, 0); + console.log('Total increment reported by results:', totalIncrementReported); + expect(totalIncrementReported).toBe(expectedFinalBalance - initialBalance); + + // Optional: Check the sum of tokenValue from saved transactions + let totalTokenValueFromDb = 0; + transactions.forEach((tx) => { + // For refills, rawAmount is positive, and tokenValue might be calculated based on it + // Let's assume tokenValue directly reflects the increment for simplicity here + // If calculation is involved, adjust accordingly + totalTokenValueFromDb += tx.rawAmount; // Or tx.tokenValue if that holds the increment + }); + console.log('Total rawAmount from DB transactions:', totalTokenValueFromDb); + expect(totalTokenValueFromDb).toBeCloseTo(expectedFinalBalance - initialBalance, 0); + }); + it('should create structured transactions for both prompt and completion tokens', async () => { // Create a balance for the user await Balance.create({