⚙️ fix: Plugin Message Handling Errors (#3392)

- Add unique index for messageId and user in messageSchema
- use `updateMessage` for updating the plugins message?
- add better logging for updateMessage
- prevents dupe_key or getKeyIndex error
This commit is contained in:
Danny Avila 2024-07-19 08:06:05 -04:00 committed by GitHub
parent ee4dd1b2e9
commit 9e7615f832
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 62 additions and 45 deletions

View file

@ -108,6 +108,9 @@ ${JSON.stringify(params, null, 2)}
return message.toObject(); return message.toObject();
} catch (err) { } catch (err) {
logger.error('Error saving message:', err); logger.error('Error saving message:', err);
if (metadata && metadata?.context) {
logger.info(`\`saveMessage\` context: ${metadata.context}`);
}
throw err; throw err;
} }
} }
@ -178,7 +181,7 @@ async function recordMessage({
new: true, new: true,
}); });
} catch (err) { } catch (err) {
logger.error('Error saving message:', err); logger.error('Error recording message:', err);
throw err; throw err;
} }
} }
@ -217,10 +220,12 @@ async function updateMessageText(req, { messageId, text }) {
* @param {boolean} [message.isCreatedByUser] - Indicates if the message was created by the user. * @param {boolean} [message.isCreatedByUser] - Indicates if the message was created by the user.
* @param {string} [message.sender] - The identifier of the sender. * @param {string} [message.sender] - The identifier of the sender.
* @param {number} [message.tokenCount] - The number of tokens in the message. * @param {number} [message.tokenCount] - The number of tokens in the message.
* @param {Object} [metadata] - The operation metadata
* @param {string} [metadata.context] - The operation metadata
* @returns {Promise<TMessage>} The updated message document. * @returns {Promise<TMessage>} The updated message document.
* @throws {Error} If there is an error in updating the message or if the message is not found. * @throws {Error} If there is an error in updating the message or if the message is not found.
*/ */
async function updateMessage(req, message) { async function updateMessage(req, message, metadata) {
try { try {
const { messageId, ...update } = message; const { messageId, ...update } = message;
update.isEdited = true; update.isEdited = true;
@ -248,6 +253,9 @@ async function updateMessage(req, message) {
}; };
} catch (err) { } catch (err) {
logger.error('Error updating message:', err); logger.error('Error updating message:', err);
if (metadata && metadata?.context) {
logger.info(`\`updateMessage\` context: ${metadata.context}`);
}
throw err; throw err;
} }
} }

View file

@ -129,6 +129,7 @@ if (process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) {
} }
messageSchema.index({ createdAt: 1 }); messageSchema.index({ createdAt: 1 });
messageSchema.index({ messageId: 1, user: 1 }, { unique: true });
const Message = mongoose.models.Message || mongoose.model('Message', messageSchema); const Message = mongoose.models.Message || mongoose.model('Message', messageSchema);

View file

@ -4,7 +4,7 @@ const { getResponseSender, Constants, CacheKeys, Time } = require('librechat-dat
const { initializeClient } = require('~/server/services/Endpoints/gptPlugins'); const { initializeClient } = require('~/server/services/Endpoints/gptPlugins');
const { sendMessage, createOnProgress } = require('~/server/utils'); const { sendMessage, createOnProgress } = require('~/server/utils');
const { addTitle } = require('~/server/services/Endpoints/openAI'); const { addTitle } = require('~/server/services/Endpoints/openAI');
const { saveMessage } = require('~/models'); const { saveMessage, updateMessage } = require('~/models');
const { getLogStores } = require('~/cache'); const { getLogStores } = require('~/cache');
const { const {
handleAbort, handleAbort,
@ -73,7 +73,14 @@ router.post(
}; };
const messageCache = getLogStores(CacheKeys.MESSAGES); const messageCache = getLogStores(CacheKeys.MESSAGES);
const throttledSetMessage = throttle(messageCache.set, 3000, { trailing: false }); const throttledCacheSet = throttle(
(text) => {
messageCache.set(responseMessageId, text, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
);
let streaming = null; let streaming = null;
let timer = null; let timer = null;
@ -87,21 +94,7 @@ router.post(
clearTimeout(timer); clearTimeout(timer);
} }
/* throttledCacheSet(partialText);
{
messageId: responseMessageId,
sender,
conversationId,
parentMessageId: overrideParentMessageId || userMessageId,
text: partialText,
model: endpointOption.modelOptions.model,
unfinished: true,
error: false,
plugins,
user,
}
*/
throttledSetMessage(responseMessageId, partialText, Time.FIVE_MINUTES);
streaming = new Promise((resolve) => { streaming = new Promise((resolve) => {
timer = setTimeout(() => { timer = setTimeout(() => {
@ -175,7 +168,11 @@ router.post(
const onChainEnd = () => { const onChainEnd = () => {
if (!client.skipSaveUserMessage) { if (!client.skipSaveUserMessage) {
saveMessage(req, { ...userMessage, user }); saveMessage(
req,
{ ...userMessage, user },
{ context: 'api/server/routes/ask/gptPlugins.js - onChainEnd' },
);
} }
sendIntermediateMessage(res, { sendIntermediateMessage(res, {
plugins, plugins,
@ -212,9 +209,6 @@ router.post(
logger.debug('[/ask/gptPlugins]', response); logger.debug('[/ask/gptPlugins]', response);
response.plugins = plugins.map((p) => ({ ...p, loading: false }));
await saveMessage(req, { ...response, user });
const { conversation = {} } = await client.responsePromise; const { conversation = {} } = await client.responsePromise;
conversation.title = conversation.title =
conversation && !conversation.title ? null : conversation?.title || 'New Chat'; conversation && !conversation.title ? null : conversation?.title || 'New Chat';
@ -235,6 +229,15 @@ router.post(
client, client,
}); });
} }
response.plugins = plugins.map((p) => ({ ...p, loading: false }));
if (response.plugins?.length > 0) {
await updateMessage(
req,
{ ...response, user },
{ context: 'api/server/routes/ask/gptPlugins.js - save plugins used' },
);
}
} catch (error) { } catch (error) {
const partialText = getPartialText(); const partialText = getPartialText();
handleAbortError(res, req, error, { handleAbortError(res, req, error, {

View file

@ -13,7 +13,7 @@ const {
} = require('~/server/middleware'); } = require('~/server/middleware');
const { sendMessage, createOnProgress, formatSteps, formatAction } = require('~/server/utils'); const { sendMessage, createOnProgress, formatSteps, formatAction } = require('~/server/utils');
const { initializeClient } = require('~/server/services/Endpoints/gptPlugins'); const { initializeClient } = require('~/server/services/Endpoints/gptPlugins');
const { saveMessage } = require('~/models'); const { saveMessage, updateMessage } = require('~/models');
const { getLogStores } = require('~/cache'); const { getLogStores } = require('~/cache');
const { validateTools } = require('~/app'); const { validateTools } = require('~/app');
const { logger } = require('~/config'); const { logger } = require('~/config');
@ -81,7 +81,14 @@ router.post(
}; };
const messageCache = getLogStores(CacheKeys.MESSAGES); const messageCache = getLogStores(CacheKeys.MESSAGES);
const throttledSetMessage = throttle(messageCache.set, 3000, { trailing: false }); const throttledCacheSet = throttle(
(text) => {
messageCache.set(responseMessageId, text, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
);
const { const {
onProgress: progressCallback, onProgress: progressCallback,
sendIntermediateMessage, sendIntermediateMessage,
@ -92,22 +99,7 @@ router.post(
if (plugin.loading === true) { if (plugin.loading === true) {
plugin.loading = false; plugin.loading = false;
} }
throttledCacheSet(partialText);
/*
{
messageId: responseMessageId,
sender,
conversationId,
parentMessageId: overrideParentMessageId || userMessageId,
text: partialText,
model: endpointOption.modelOptions.model,
unfinished: true,
isEdited: true,
error: false,
user,
}
*/
throttledSetMessage(responseMessageId, partialText, Time.FIVE_MINUTES);
}, },
}); });
@ -115,7 +107,11 @@ router.post(
let { intermediateSteps: steps } = data; let { intermediateSteps: steps } = data;
plugin.outputs = steps && steps[0].action ? formatSteps(steps) : 'An error occurred.'; plugin.outputs = steps && steps[0].action ? formatSteps(steps) : 'An error occurred.';
plugin.loading = false; plugin.loading = false;
saveMessage(req, { ...userMessage, user }); saveMessage(
req,
{ ...userMessage, user },
{ context: 'api/server/routes/ask/gptPlugins.js - onChainEnd' },
);
sendIntermediateMessage(res, { sendIntermediateMessage(res, {
plugin, plugin,
parentMessageId: userMessage.messageId, parentMessageId: userMessage.messageId,
@ -146,7 +142,11 @@ router.post(
plugin.inputs.push(formattedAction); plugin.inputs.push(formattedAction);
plugin.latest = formattedAction.plugin; plugin.latest = formattedAction.plugin;
if (!start && !client.skipSaveUserMessage) { if (!start && !client.skipSaveUserMessage) {
saveMessage(req, { ...userMessage, user }); saveMessage(
req,
{ ...userMessage, user },
{ context: 'api/server/routes/ask/gptPlugins.js - onAgentAction' },
);
} }
sendIntermediateMessage(res, { sendIntermediateMessage(res, {
plugin, plugin,
@ -184,8 +184,6 @@ router.post(
} }
logger.debug('[/edit/gptPlugins] CLIENT RESPONSE', response); logger.debug('[/edit/gptPlugins] CLIENT RESPONSE', response);
response.plugin = { ...plugin, loading: false };
await saveMessage(req, { ...response, user });
const { conversation = {} } = await client.responsePromise; const { conversation = {} } = await client.responsePromise;
conversation.title = conversation.title =
@ -199,6 +197,13 @@ router.post(
responseMessage: response, responseMessage: response,
}); });
res.end(); res.end();
response.plugin = { ...plugin, loading: false };
await updateMessage(
req,
{ ...response, user },
{ context: 'api/server/routes/edit/gptPlugins.js' },
);
} catch (error) { } catch (error) {
const partialText = getPartialText(); const partialText = getPartialText();
handleAbortError(res, req, error, { handleAbortError(res, req, error, {