mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-04-03 14:27:20 +02:00
* fix: Lazy-initialize balance record when missing at check time When balance is configured via admin panel DB overrides, users with existing sessions never pass through the login middleware that creates their balance record. This causes checkBalanceRecord to find no record and return balance: 0, blocking the user. Add optional balanceConfig and upsertBalanceFields deps to CheckBalanceDeps. When no balance record exists but startBalance is configured, lazily create the record instead of returning canSpend: false. Pass the new deps from BaseClient, chatV1, and chatV2 callers. * test: Add checkBalance lazy initialization tests Cover lazy balance init scenarios: successful init with startBalance, insufficient startBalance, missing config fallback, undefined startBalance, missing upsertBalanceFields dep, and startBalance of 0. * fix: Address review findings for lazy balance initialization - Use canonical BalanceConfig and IBalanceUpdate types from @librechat/data-schemas instead of inline type definitions - Include auto-refill fields (autoRefillEnabled, refillIntervalValue, refillIntervalUnit, refillAmount, lastRefill) during lazy init, mirroring the login middleware's buildUpdateFields logic - Add try/catch around upsertBalanceFields with graceful fallback to canSpend: false on DB errors - Read balance from DB return value instead of raw startBalance constant - Fix misleading test names to describe observable throw behavior - Add tests: upsertBalanceFields rejection, auto-refill field inclusion, DB-returned balance value, and logViolation assertions * fix: Address second review pass findings - Fix import ordering: package type imports before local type imports - Remove misleading comment on DB-fallback test, rename for clarity - Add logViolation assertion to insufficient-balance lazy-init test - Add test for partial auto-refill config (autoRefillEnabled without required dependent fields) * refactor: Replace createMockReqRes factory with describe-scoped consts Replace zero-argument factory with plain const declarations using direct type casts instead of double-cast through unknown. * fix: Sort local type imports longest-first, add missing logViolation assertion - Reorder local type imports in spec file per AGENTS.md (longest to shortest within sub-group) - Add logViolation assertion to startBalance: 0 test for consistent violation payload coverage across all throw paths
510 lines
13 KiB
JavaScript
510 lines
13 KiB
JavaScript
const { v4 } = require('uuid');
|
|
const { sleep } = require('@librechat/agents');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
sendEvent,
|
|
countTokens,
|
|
checkBalance,
|
|
getBalanceConfig,
|
|
getModelMaxTokens,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Time,
|
|
Constants,
|
|
RunStatus,
|
|
CacheKeys,
|
|
ContentTypes,
|
|
ToolCallTypes,
|
|
EModelEndpoint,
|
|
retrievalMimeTypes,
|
|
AssistantStreamEvents,
|
|
} = require('librechat-data-provider');
|
|
const {
|
|
initThread,
|
|
recordUsage,
|
|
saveUserMessage,
|
|
addThreadMetadata,
|
|
saveAssistantMessage,
|
|
} = require('~/server/services/Threads');
|
|
const { runAssistant, createOnTextProgress } = require('~/server/services/AssistantService');
|
|
const { createErrorHandler } = require('~/server/controllers/assistants/errors');
|
|
const validateAuthor = require('~/server/middleware/assistants/validateAuthor');
|
|
const { createRun, StreamRunManager } = require('~/server/services/Runs');
|
|
const { addTitle } = require('~/server/services/Endpoints/assistants');
|
|
const { createRunBody } = require('~/server/services/createRunBody');
|
|
const {
|
|
getConvo,
|
|
getMultiplier,
|
|
getTransactions,
|
|
findBalanceByUser,
|
|
upsertBalanceFields,
|
|
createAutoRefillTransaction,
|
|
} = require('~/models');
|
|
const { logViolation, getLogStores } = require('~/cache');
|
|
const { getOpenAIClient } = require('./helpers');
|
|
|
|
/**
|
|
* @route POST /
|
|
* @desc Chat with an assistant
|
|
* @access Public
|
|
* @param {ServerRequest} req - The request object, containing the request data.
|
|
* @param {Express.Response} res - The response object, used to send back a response.
|
|
* @returns {void}
|
|
*/
|
|
const chatV2 = async (req, res) => {
|
|
logger.debug('[/assistants/chat/] req.body', req.body);
|
|
const appConfig = req.config;
|
|
|
|
/** @type {{files: MongoFile[]}} */
|
|
const {
|
|
text,
|
|
model,
|
|
endpoint,
|
|
files = [],
|
|
promptPrefix,
|
|
assistant_id,
|
|
instructions,
|
|
endpointOption,
|
|
thread_id: _thread_id,
|
|
messageId: _messageId,
|
|
conversationId: convoId,
|
|
parentMessageId: _parentId = Constants.NO_PARENT,
|
|
clientTimestamp,
|
|
} = req.body;
|
|
|
|
/** @type {OpenAI} */
|
|
let openai;
|
|
/** @type {string|undefined} - the current thread id */
|
|
let thread_id = _thread_id;
|
|
/** @type {string|undefined} - the current run id */
|
|
let run_id;
|
|
/** @type {string|undefined} - the parent messageId */
|
|
let parentMessageId = _parentId;
|
|
/** @type {TMessage[]} */
|
|
let previousMessages = [];
|
|
/** @type {import('librechat-data-provider').TConversation | null} */
|
|
let conversation = null;
|
|
/** @type {string[]} */
|
|
let file_ids = [];
|
|
/** @type {Set<string>} */
|
|
let attachedFileIds = new Set();
|
|
/** @type {TMessage | null} */
|
|
let requestMessage = null;
|
|
|
|
const userMessageId = v4();
|
|
const responseMessageId = v4();
|
|
|
|
/** @type {string} - The conversation UUID - created if undefined */
|
|
const conversationId = convoId ?? v4();
|
|
|
|
const cache = getLogStores(CacheKeys.ABORT_KEYS);
|
|
const cacheKey = `${req.user.id}:${conversationId}`;
|
|
|
|
/** @type {Run | undefined} - The completed run, undefined if incomplete */
|
|
let completedRun;
|
|
|
|
const getContext = () => ({
|
|
openai,
|
|
run_id,
|
|
endpoint,
|
|
cacheKey,
|
|
thread_id,
|
|
completedRun,
|
|
assistant_id,
|
|
conversationId,
|
|
parentMessageId,
|
|
responseMessageId,
|
|
});
|
|
|
|
const handleError = createErrorHandler({ req, res, getContext });
|
|
|
|
try {
|
|
res.on('close', async () => {
|
|
if (!completedRun) {
|
|
await handleError(new Error('Request closed'));
|
|
}
|
|
});
|
|
|
|
if (convoId && !_thread_id) {
|
|
completedRun = true;
|
|
throw new Error('Missing thread_id for existing conversation');
|
|
}
|
|
|
|
if (!assistant_id) {
|
|
completedRun = true;
|
|
throw new Error('Missing assistant_id');
|
|
}
|
|
|
|
const checkBalanceBeforeRun = async () => {
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
if (!balanceConfig?.enabled) {
|
|
return;
|
|
}
|
|
const transactions =
|
|
(await getTransactions({
|
|
user: req.user.id,
|
|
context: 'message',
|
|
conversationId,
|
|
})) ?? [];
|
|
|
|
const totalPreviousTokens = Math.abs(
|
|
transactions.reduce((acc, curr) => acc + curr.rawAmount, 0),
|
|
);
|
|
|
|
// TODO: make promptBuffer a config option; buffer for titles, needs buffer for system instructions
|
|
const promptBuffer = parentMessageId === Constants.NO_PARENT && !_thread_id ? 200 : 0;
|
|
// 5 is added for labels
|
|
let promptTokens = (await countTokens(text + (promptPrefix ?? ''))) + 5;
|
|
promptTokens += totalPreviousTokens + promptBuffer;
|
|
// Count tokens up to the current context window
|
|
promptTokens = Math.min(promptTokens, getModelMaxTokens(model));
|
|
|
|
await checkBalance(
|
|
{
|
|
req,
|
|
res,
|
|
txData: {
|
|
model,
|
|
user: req.user.id,
|
|
tokenType: 'prompt',
|
|
amount: promptTokens,
|
|
},
|
|
},
|
|
{
|
|
findBalanceByUser,
|
|
getMultiplier,
|
|
createAutoRefillTransaction,
|
|
logViolation,
|
|
balanceConfig,
|
|
upsertBalanceFields,
|
|
},
|
|
);
|
|
};
|
|
|
|
const { openai: _openai } = await getOpenAIClient({
|
|
req,
|
|
res,
|
|
endpointOption,
|
|
});
|
|
|
|
openai = _openai;
|
|
await validateAuthor({ req, openai });
|
|
|
|
if (previousMessages.length) {
|
|
parentMessageId = previousMessages[previousMessages.length - 1].messageId;
|
|
}
|
|
|
|
let userMessage = {
|
|
role: 'user',
|
|
content: [
|
|
{
|
|
type: ContentTypes.TEXT,
|
|
text,
|
|
},
|
|
],
|
|
metadata: {
|
|
messageId: userMessageId,
|
|
},
|
|
};
|
|
|
|
/** @type {CreateRunBody | undefined} */
|
|
const body = createRunBody({
|
|
assistant_id,
|
|
model,
|
|
promptPrefix,
|
|
instructions,
|
|
endpointOption,
|
|
clientTimestamp,
|
|
});
|
|
|
|
const getRequestFileIds = async () => {
|
|
let thread_file_ids = [];
|
|
if (convoId) {
|
|
const convo = await getConvo(req.user.id, convoId);
|
|
if (convo && convo.file_ids) {
|
|
thread_file_ids = convo.file_ids;
|
|
}
|
|
}
|
|
|
|
if (files.length || thread_file_ids.length) {
|
|
attachedFileIds = new Set([...file_ids, ...thread_file_ids]);
|
|
|
|
let attachmentIndex = 0;
|
|
for (const file of files) {
|
|
file_ids.push(file.file_id);
|
|
if (file.type.startsWith('image')) {
|
|
userMessage.content.push({
|
|
type: ContentTypes.IMAGE_FILE,
|
|
[ContentTypes.IMAGE_FILE]: { file_id: file.file_id },
|
|
});
|
|
}
|
|
|
|
if (!userMessage.attachments) {
|
|
userMessage.attachments = [];
|
|
}
|
|
|
|
userMessage.attachments.push({
|
|
file_id: file.file_id,
|
|
tools: [{ type: ToolCallTypes.CODE_INTERPRETER }],
|
|
});
|
|
|
|
if (file.type.startsWith('image')) {
|
|
continue;
|
|
}
|
|
|
|
const mimeType = file.type;
|
|
const isSupportedByRetrieval = retrievalMimeTypes.some((regex) => regex.test(mimeType));
|
|
if (isSupportedByRetrieval) {
|
|
userMessage.attachments[attachmentIndex].tools.push({
|
|
type: ToolCallTypes.FILE_SEARCH,
|
|
});
|
|
}
|
|
|
|
attachmentIndex++;
|
|
}
|
|
}
|
|
};
|
|
|
|
/** @type {Promise<Run>|undefined} */
|
|
let userMessagePromise;
|
|
|
|
const initializeThread = async () => {
|
|
await getRequestFileIds();
|
|
|
|
// TODO: may allow multiple messages to be created beforehand in a future update
|
|
const initThreadBody = {
|
|
messages: [userMessage],
|
|
metadata: {
|
|
user: req.user.id,
|
|
conversationId,
|
|
},
|
|
};
|
|
|
|
const result = await initThread({ openai, body: initThreadBody, thread_id });
|
|
thread_id = result.thread_id;
|
|
|
|
createOnTextProgress({
|
|
openai,
|
|
conversationId,
|
|
userMessageId,
|
|
messageId: responseMessageId,
|
|
thread_id,
|
|
});
|
|
|
|
requestMessage = {
|
|
user: req.user.id,
|
|
text,
|
|
messageId: userMessageId,
|
|
parentMessageId,
|
|
// TODO: make sure client sends correct format for `files`, use zod
|
|
files,
|
|
file_ids,
|
|
conversationId,
|
|
isCreatedByUser: true,
|
|
assistant_id,
|
|
thread_id,
|
|
model: assistant_id,
|
|
endpoint,
|
|
};
|
|
|
|
previousMessages.push(requestMessage);
|
|
|
|
/* asynchronous */
|
|
userMessagePromise = saveUserMessage(req, { ...requestMessage, model });
|
|
|
|
conversation = {
|
|
conversationId,
|
|
endpoint,
|
|
promptPrefix: promptPrefix,
|
|
instructions: instructions,
|
|
assistant_id,
|
|
// model,
|
|
};
|
|
|
|
if (file_ids.length) {
|
|
conversation.file_ids = file_ids;
|
|
}
|
|
};
|
|
|
|
const promises = [initializeThread(), checkBalanceBeforeRun()];
|
|
await Promise.all(promises);
|
|
|
|
const sendInitialResponse = () => {
|
|
sendEvent(res, {
|
|
sync: true,
|
|
conversationId,
|
|
// messages: previousMessages,
|
|
requestMessage,
|
|
responseMessage: {
|
|
user: req.user.id,
|
|
messageId: openai.responseMessage.messageId,
|
|
parentMessageId: userMessageId,
|
|
conversationId,
|
|
assistant_id,
|
|
thread_id,
|
|
model: assistant_id,
|
|
},
|
|
});
|
|
};
|
|
|
|
/** @type {RunResponse | typeof StreamRunManager | undefined} */
|
|
let response;
|
|
|
|
const processRun = async (retry = false) => {
|
|
if (endpoint === EModelEndpoint.azureAssistants) {
|
|
body.model = openai._options.model;
|
|
openai.attachedFileIds = attachedFileIds;
|
|
if (retry) {
|
|
response = await runAssistant({
|
|
openai,
|
|
thread_id,
|
|
run_id,
|
|
in_progress: openai.in_progress,
|
|
});
|
|
return;
|
|
}
|
|
|
|
/* NOTE:
|
|
* By default, a Run will use the model and tools configuration specified in Assistant object,
|
|
* but you can override most of these when creating the Run for added flexibility:
|
|
*/
|
|
const run = await createRun({
|
|
openai,
|
|
thread_id,
|
|
body,
|
|
});
|
|
|
|
run_id = run.id;
|
|
await cache.set(cacheKey, `${thread_id}:${run_id}`, Time.TEN_MINUTES);
|
|
sendInitialResponse();
|
|
|
|
// todo: retry logic
|
|
response = await runAssistant({ openai, thread_id, run_id });
|
|
return;
|
|
}
|
|
|
|
/** @type {{[AssistantStreamEvents.ThreadRunCreated]: (event: ThreadRunCreated) => Promise<void>}} */
|
|
const handlers = {
|
|
[AssistantStreamEvents.ThreadRunCreated]: async (event) => {
|
|
await cache.set(cacheKey, `${thread_id}:${event.data.id}`, Time.TEN_MINUTES);
|
|
run_id = event.data.id;
|
|
sendInitialResponse();
|
|
},
|
|
};
|
|
|
|
/** @type {undefined | TAssistantEndpoint} */
|
|
const config = appConfig.endpoints?.[endpoint] ?? {};
|
|
/** @type {undefined | TBaseEndpoint} */
|
|
const allConfig = appConfig.endpoints?.all;
|
|
|
|
const streamRunManager = new StreamRunManager({
|
|
req,
|
|
res,
|
|
openai,
|
|
handlers,
|
|
thread_id,
|
|
attachedFileIds,
|
|
parentMessageId: userMessageId,
|
|
responseMessage: openai.responseMessage,
|
|
streamRate: allConfig?.streamRate ?? config.streamRate,
|
|
// streamOptions: {
|
|
|
|
// },
|
|
});
|
|
|
|
await streamRunManager.runAssistant({
|
|
thread_id,
|
|
body,
|
|
});
|
|
|
|
response = streamRunManager;
|
|
response.text = streamRunManager.intermediateText;
|
|
};
|
|
|
|
await processRun();
|
|
logger.debug('[/assistants/chat/] response', {
|
|
run: response.run,
|
|
steps: response.steps,
|
|
});
|
|
|
|
if (response.run.status === RunStatus.CANCELLED) {
|
|
logger.debug('[/assistants/chat/] Run cancelled, handled by `abortRun`');
|
|
return res.end();
|
|
}
|
|
|
|
if (response.run.status === RunStatus.IN_PROGRESS) {
|
|
processRun(true);
|
|
}
|
|
|
|
completedRun = response.run;
|
|
|
|
/** @type {ResponseMessage} */
|
|
const responseMessage = {
|
|
...(response.responseMessage ?? response.finalMessage),
|
|
text: response.text,
|
|
parentMessageId: userMessageId,
|
|
conversationId,
|
|
user: req.user.id,
|
|
assistant_id,
|
|
thread_id,
|
|
model: assistant_id,
|
|
endpoint,
|
|
spec: endpointOption.spec,
|
|
iconURL: endpointOption.iconURL,
|
|
};
|
|
|
|
sendEvent(res, {
|
|
final: true,
|
|
conversation,
|
|
requestMessage: {
|
|
parentMessageId,
|
|
thread_id,
|
|
},
|
|
});
|
|
res.end();
|
|
|
|
if (userMessagePromise) {
|
|
await userMessagePromise;
|
|
}
|
|
await saveAssistantMessage(req, { ...responseMessage, model });
|
|
|
|
if (parentMessageId === Constants.NO_PARENT && !_thread_id) {
|
|
addTitle(req, {
|
|
text,
|
|
responseText: response.text,
|
|
conversationId,
|
|
});
|
|
}
|
|
|
|
await addThreadMetadata({
|
|
openai,
|
|
thread_id,
|
|
messageId: responseMessage.messageId,
|
|
messages: response.messages,
|
|
});
|
|
|
|
if (!response.run.usage) {
|
|
await sleep(3000);
|
|
completedRun = await openai.beta.threads.runs.retrieve(response.run.id, { thread_id });
|
|
if (completedRun.usage) {
|
|
await recordUsage({
|
|
...completedRun.usage,
|
|
user: req.user.id,
|
|
model: completedRun.model ?? model,
|
|
conversationId,
|
|
});
|
|
}
|
|
} else {
|
|
await recordUsage({
|
|
...response.run.usage,
|
|
user: req.user.id,
|
|
model: response.run.model ?? model,
|
|
conversationId,
|
|
});
|
|
}
|
|
} catch (error) {
|
|
await handleError(error);
|
|
}
|
|
};
|
|
|
|
module.exports = chatV2;
|