👥 fix(assistants): Improve Error handling (#2012)

* feat: make assistants endpoint appendable since message state is not managed by LibreChat

* fix(ask): search currentMessages for thread_id if it's not defined

* refactor(abortMiddleware): remove use of `overrideProps` and spread unknown fields instead

* chore: remove console.log in `abortConversation`

* refactor(assistants): improve error handling/cancellation flow
This commit is contained in:
Danny Avila 2024-03-07 10:50:01 -05:00 committed by GitHub
parent d4fe8fc82d
commit 18edd2660b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 77 additions and 25 deletions

View file

@ -110,7 +110,7 @@ const handleAbortError = async (res, req, error, data) => {
} }
const respondWithError = async (partialText) => { const respondWithError = async (partialText) => {
const options = { let options = {
sender, sender,
messageId, messageId,
conversationId, conversationId,
@ -121,7 +121,8 @@ const handleAbortError = async (res, req, error, data) => {
}; };
if (partialText) { if (partialText) {
options.overrideProps = { options = {
...options,
error: false, error: false,
unfinished: true, unfinished: true,
text: partialText, text: partialText,

View file

@ -37,7 +37,7 @@ async function abortRun(req, res) {
try { try {
await cache.set(cacheKey, 'cancelled'); await cache.set(cacheKey, 'cancelled');
const cancelledRun = await openai.beta.threads.runs.cancel(thread_id, run_id); const cancelledRun = await openai.beta.threads.runs.cancel(thread_id, run_id);
logger.debug('Cancelled run:', cancelledRun); logger.debug('[abortRun] Cancelled run:', cancelledRun);
} catch (error) { } catch (error) {
logger.error('[abortRun] Error cancelling run', error); logger.error('[abortRun] Error cancelling run', error);
if ( if (

View file

@ -11,10 +11,10 @@ const {
} = require('~/server/services/Threads'); } = require('~/server/services/Threads');
const { runAssistant, createOnTextProgress } = require('~/server/services/AssistantService'); const { runAssistant, createOnTextProgress } = require('~/server/services/AssistantService');
const { addTitle, initializeClient } = require('~/server/services/Endpoints/assistant'); const { addTitle, initializeClient } = require('~/server/services/Endpoints/assistant');
const { sendResponse, sendMessage } = require('~/server/utils');
const { createRun, sleep } = require('~/server/services/Runs'); const { createRun, sleep } = require('~/server/services/Runs');
const { getConvo } = require('~/models/Conversation'); const { getConvo } = require('~/models/Conversation');
const getLogStores = require('~/cache/getLogStores'); const getLogStores = require('~/cache/getLogStores');
const { sendMessage } = require('~/server/utils');
const { logger } = require('~/config'); const { logger } = require('~/config');
const router = express.Router(); const router = express.Router();
@ -101,32 +101,52 @@ router.post('/', validateModel, buildEndpointOption, setHeaders, async (req, res
let completedRun; let completedRun;
const handleError = async (error) => { const handleError = async (error) => {
const messageData = {
thread_id,
assistant_id,
conversationId,
parentMessageId,
sender: 'System',
user: req.user.id,
shouldSaveMessage: false,
messageId: responseMessageId,
endpoint: EModelEndpoint.assistants,
};
if (error.message === 'Run cancelled') { if (error.message === 'Run cancelled') {
return res.end(); return res.end();
} } else if (error.message === 'Request closed' && completedRun) {
if (error.message === 'Request closed' && completedRun) {
return; return;
} else if (error.message === 'Request closed') { } else if (error.message === 'Request closed') {
logger.debug('[/assistants/chat/] Request aborted on close'); logger.debug('[/assistants/chat/] Request aborted on close');
} } else {
logger.error('[/assistants/chat/]', error); logger.error('[/assistants/chat/]', error);
}
if (!openai || !thread_id || !run_id) { if (!openai || !thread_id || !run_id) {
return res.status(500).json({ error: 'The Assistant run failed to initialize' }); return sendResponse(res, messageData, 'The Assistant run failed to initialize');
} }
await sleep(3000);
try { try {
const status = await cache.get(cacheKey);
if (status === 'cancelled') {
logger.debug('[/assistants/chat/] Run already cancelled');
return res.end();
}
await cache.delete(cacheKey); await cache.delete(cacheKey);
const cancelledRun = await openai.beta.threads.runs.cancel(thread_id, run_id); const cancelledRun = await openai.beta.threads.runs.cancel(thread_id, run_id);
logger.debug('Cancelled run:', cancelledRun); logger.debug('[/assistants/chat/] Cancelled run:', cancelledRun);
} catch (error) { } catch (error) {
logger.error('[abortRun] Error cancelling run', error); logger.error('[/assistants/chat/] Error cancelling run', error);
} }
await sleep(2000); await sleep(2000);
let run;
try { try {
const run = await openai.beta.threads.runs.retrieve(thread_id, run_id); run = await openai.beta.threads.runs.retrieve(thread_id, run_id);
await recordUsage({ await recordUsage({
...run.usage, ...run.usage,
model: run.model, model: run.model,
@ -137,6 +157,7 @@ router.post('/', validateModel, buildEndpointOption, setHeaders, async (req, res
logger.error('[/assistants/chat/] Error fetching or processing run', error); logger.error('[/assistants/chat/] Error fetching or processing run', error);
} }
let finalEvent;
try { try {
const runMessages = await checkMessageGaps({ const runMessages = await checkMessageGaps({
openai, openai,
@ -146,22 +167,18 @@ router.post('/', validateModel, buildEndpointOption, setHeaders, async (req, res
latestMessageId: responseMessageId, latestMessageId: responseMessageId,
}); });
const finalEvent = { finalEvent = {
title: 'New Chat', title: 'New Chat',
final: true, final: true,
conversation: await getConvo(req.user.id, conversationId), conversation: await getConvo(req.user.id, conversationId),
runMessages, runMessages,
}; };
if (res.headersSent && finalEvent) {
return sendMessage(res, finalEvent);
}
res.json(finalEvent);
} catch (error) { } catch (error) {
logger.error('[/assistants/chat/] Error finalizing error process', error); logger.error('[/assistants/chat/] Error finalizing error process', error);
return res.status(500).json({ error: 'The Assistant run failed' }); return sendResponse(res, messageData, 'The Assistant run failed');
} }
return sendResponse(res, finalEvent);
}; };
try { try {
@ -172,10 +189,12 @@ router.post('/', validateModel, buildEndpointOption, setHeaders, async (req, res
}); });
if (convoId && !_thread_id) { if (convoId && !_thread_id) {
completedRun = true;
throw new Error('Missing thread_id for existing conversation'); throw new Error('Missing thread_id for existing conversation');
} }
if (!assistant_id) { if (!assistant_id) {
completedRun = true;
throw new Error('Missing assistant_id'); throw new Error('Missing assistant_id');
} }

View file

@ -32,6 +32,13 @@ const sendMessage = (res, message, event = 'message') => {
* @async * @async
* @param {object} res - The server response. * @param {object} res - The server response.
* @param {object} options - The options for handling the error containing message properties. * @param {object} options - The options for handling the error containing message properties.
* @param {object} options.user - The user ID.
* @param {string} options.sender - The sender of the message.
* @param {string} options.conversationId - The conversation ID.
* @param {string} options.messageId - The message ID.
* @param {string} options.parentMessageId - The parent message ID.
* @param {string} options.text - The error message.
* @param {boolean} options.shouldSaveMessage - [Optional] Whether the message should be saved. Default is true.
* @param {function} callback - [Optional] The callback function to be executed. * @param {function} callback - [Optional] The callback function to be executed.
*/ */
const sendError = async (res, options, callback) => { const sendError = async (res, options, callback) => {
@ -43,7 +50,7 @@ const sendError = async (res, options, callback) => {
parentMessageId, parentMessageId,
text, text,
shouldSaveMessage, shouldSaveMessage,
overrideProps = {}, ...rest
} = options; } = options;
const errorMessage = { const errorMessage = {
sender, sender,
@ -55,7 +62,7 @@ const sendError = async (res, options, callback) => {
final: true, final: true,
text, text,
isCreatedByUser: false, isCreatedByUser: false,
...overrideProps, ...rest,
}; };
if (callback && typeof callback === 'function') { if (callback && typeof callback === 'function') {
await callback(); await callback();
@ -88,7 +95,28 @@ const sendError = async (res, options, callback) => {
handleError(res, errorMessage); handleError(res, errorMessage);
}; };
/**
* Sends the response based on whether headers have been sent or not.
* @param {Express.Response} res - The server response.
* @param {Object} data - The data to be sent.
* @param {string} [errorMessage] - The error message, if any.
*/
const sendResponse = (res, data, errorMessage) => {
if (!res.headersSent) {
if (errorMessage) {
return res.status(500).json({ error: errorMessage });
}
return res.json(data);
}
if (errorMessage) {
return sendError(res, { ...data, text: errorMessage });
}
return sendMessage(res, data);
};
module.exports = { module.exports = {
sendResponse,
handleError, handleError,
sendMessage, sendMessage,
sendError, sendError,

View file

@ -45,7 +45,9 @@ export default function useTextarea({
const localize = useLocalize(); const localize = useLocalize();
const { conversationId, jailbreak, endpoint = '', assistant_id } = conversation || {}; const { conversationId, jailbreak, endpoint = '', assistant_id } = conversation || {};
const isNotAppendable = (latestMessage?.unfinished && !isSubmitting) || latestMessage?.error; const isNotAppendable =
((latestMessage?.unfinished && !isSubmitting) || latestMessage?.error) &&
endpoint !== EModelEndpoint.assistants;
// && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder // && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder
const assistant = endpoint === EModelEndpoint.assistants && assistantMap?.[assistant_id ?? '']; const assistant = endpoint === EModelEndpoint.assistants && assistantMap?.[assistant_id ?? ''];

View file

@ -418,7 +418,6 @@ export default function useSSE(submission: TSubmission | null, index = 0) {
const abortConversation = useCallback( const abortConversation = useCallback(
async (conversationId = '', submission: TSubmission) => { async (conversationId = '', submission: TSubmission) => {
console.log(submission);
let runAbortKey = ''; let runAbortKey = '';
try { try {
const conversation = (JSON.parse(localStorage.getItem('lastConversationSetup') ?? '') ?? const conversation = (JSON.parse(localStorage.getItem('lastConversationSetup') ?? '') ??

View file

@ -140,7 +140,10 @@ export default function useChatHelpers(index = 0, paramId: string | undefined) {
(msg) => msg.messageId === latestMessage?.parentMessageId, (msg) => msg.messageId === latestMessage?.parentMessageId,
); );
const thread_id = parentMessage?.thread_id ?? latestMessage?.thread_id; let thread_id = parentMessage?.thread_id ?? latestMessage?.thread_id;
if (!thread_id) {
thread_id = currentMessages.find((message) => message.thread_id)?.thread_id;
}
const endpointsConfig = queryClient.getQueryData<TEndpointsConfig>([QueryKeys.endpoints]); const endpointsConfig = queryClient.getQueryData<TEndpointsConfig>([QueryKeys.endpoints]);
const endpointType = getEndpointField(endpointsConfig, endpoint, 'type'); const endpointType = getEndpointField(endpointsConfig, endpoint, 'type');