🛡️ fix: Improve Error Handling and Null Safety in SSE Event Processing (#10751)

* 🔧 fix: Handle null content parts in message processing

- Added checks to filter out null content parts in various message handling functions, ensuring robustness against undefined values.
- Updated the `extractMessageContent`, `useContentHandler`, `useEventHandlers`, and `useStepHandler` hooks to prevent errors caused by null parts.
- Enhanced the `getAllContentText` utility to only include valid content types, improving overall message integrity.

* 🔧 fix: Enhance error handling in event and SSE handlers

- Wrapped critical sections in try-catch blocks within `useEventHandlers` and `useSSE` hooks to improve error management and prevent application crashes.
- Added console error logging for better debugging and tracking of issues during message processing and conversation aborting.
- Ensured that UI states like `setIsSubmitting` and `setShowStopButton` are correctly updated in case of errors, maintaining a consistent user experience.

* 🔧 fix: Filter out null and empty content in message export

- Enhanced the `useExportConversation` hook to filter out null content parts and empty strings during message processing, ensuring only valid content is included in the export.
- This change improves the integrity of exported conversations by preventing unnecessary empty entries in the output.
This commit is contained in:
Danny Avila 2025-12-01 14:05:50 -05:00 committed by GitHub
parent 6c0aad423f
commit 026890cd27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 201 additions and 156 deletions

View file

@ -45,6 +45,9 @@ const extractMessageContent = (message: TMessage): string => {
if (Array.isArray(message.content)) { if (Array.isArray(message.content)) {
return message.content return message.content
.map((part) => { .map((part) => {
if (part == null) {
return '';
}
if (typeof part === 'string') { if (typeof part === 'string') {
return part; return part;
} }

View file

@ -73,7 +73,9 @@ export default function useExportConversation({
} }
return message.content return message.content
.filter((content) => content != null)
.map((content) => getMessageContent(message.sender || '', content)) .map((content) => getMessageContent(message.sender || '', content))
.filter((text) => text.length > 0)
.map((text) => { .map((text) => {
return formatText(text[0], text[1]); return formatText(text[0], text[1]);
}) })

View file

@ -33,9 +33,8 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
const _messages = getMessages(); const _messages = getMessages();
const messages = const messages =
_messages _messages?.filter((m) => m.messageId !== messageId).map((msg) => ({ ...msg, thread_id })) ??
?.filter((m) => m.messageId !== messageId) [];
.map((msg) => ({ ...msg, thread_id })) ?? [];
const userMessage = messages[messages.length - 1] as TMessage | undefined; const userMessage = messages[messages.length - 1] as TMessage | undefined;
const { initialResponse } = submission; const { initialResponse } = submission;
@ -66,14 +65,17 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
response.content[index] = { type, [type]: part } as TMessageContentParts; response.content[index] = { type, [type]: part } as TMessageContentParts;
const lastContentPart = response.content[response.content.length - 1];
const initialContentPart = initialResponse.content?.[0];
if ( if (
type !== ContentTypes.TEXT && type !== ContentTypes.TEXT &&
initialResponse.content && initialContentPart != null &&
((response.content[response.content.length - 1].type === ContentTypes.TOOL_CALL && lastContentPart != null &&
response.content[response.content.length - 1][ContentTypes.TOOL_CALL].progress === 1) || ((lastContentPart.type === ContentTypes.TOOL_CALL &&
response.content[response.content.length - 1].type === ContentTypes.IMAGE_FILE) lastContentPart[ContentTypes.TOOL_CALL]?.progress === 1) ||
lastContentPart.type === ContentTypes.IMAGE_FILE)
) { ) {
response.content.push(initialResponse.content[0]); response.content.push(initialContentPart);
} }
setMessages([...messages, response]); setMessages([...messages, response]);

View file

@ -87,12 +87,14 @@ const createErrorMessage = ({
let isValidContentPart = false; let isValidContentPart = false;
if (latestContent.length > 0) { if (latestContent.length > 0) {
const latestContentPart = latestContent[latestContent.length - 1]; const latestContentPart = latestContent[latestContent.length - 1];
const latestPartValue = latestContentPart?.[latestContentPart.type ?? '']; if (latestContentPart != null) {
isValidContentPart = const latestPartValue = latestContentPart[latestContentPart.type ?? ''];
latestContentPart.type !== ContentTypes.TEXT || isValidContentPart =
(latestContentPart.type === ContentTypes.TEXT && typeof latestPartValue === 'string') latestContentPart.type !== ContentTypes.TEXT ||
? true (latestContentPart.type === ContentTypes.TEXT && typeof latestPartValue === 'string')
: latestPartValue?.value !== ''; ? true
: latestPartValue?.value !== '';
}
} }
if ( if (
latestMessage?.conversationId && latestMessage?.conversationId &&
@ -455,141 +457,145 @@ export default function useEventHandlers({
isTemporary = false, isTemporary = false,
} = submission; } = submission;
if (responseMessage?.attachments && responseMessage.attachments.length > 0) { try {
// Process each attachment through the attachmentHandler if (responseMessage?.attachments && responseMessage.attachments.length > 0) {
responseMessage.attachments.forEach((attachment) => { // Process each attachment through the attachmentHandler
const attachmentData = { responseMessage.attachments.forEach((attachment) => {
...attachment, const attachmentData = {
messageId: responseMessage.messageId, ...attachment,
}; messageId: responseMessage.messageId,
};
attachmentHandler({ attachmentHandler({
data: attachmentData, data: attachmentData,
submission: submission as EventSubmission, submission: submission as EventSubmission,
});
}); });
}); }
}
setShowStopButton(false); setCompleted((prev) => new Set(prev.add(submission.initialResponse.messageId)));
setCompleted((prev) => new Set(prev.add(submission.initialResponse.messageId)));
const currentMessages = getMessages(); const currentMessages = getMessages();
/* Early return if messages are empty; i.e., the user navigated away */ /* Early return if messages are empty; i.e., the user navigated away */
if (!currentMessages || currentMessages.length === 0) { if (!currentMessages || currentMessages.length === 0) {
setIsSubmitting(false); return;
return; }
}
/* a11y announcements */ /* a11y announcements */
announcePolite({ message: 'end', isStatus: true }); announcePolite({ message: 'end', isStatus: true });
announcePolite({ message: getAllContentText(responseMessage) }); announcePolite({ message: getAllContentText(responseMessage) });
const isNewConvo = conversation.conversationId !== submissionConvo.conversationId; const isNewConvo = conversation.conversationId !== submissionConvo.conversationId;
const setFinalMessages = (id: string | null, _messages: TMessage[]) => { const setFinalMessages = (id: string | null, _messages: TMessage[]) => {
setMessages(_messages); setMessages(_messages);
queryClient.setQueryData<TMessage[]>([QueryKeys.messages, id], _messages); queryClient.setQueryData<TMessage[]>([QueryKeys.messages, id], _messages);
}; };
const hasNoResponse = const hasNoResponse =
responseMessage?.content?.[0]?.['text']?.value === responseMessage?.content?.[0]?.['text']?.value ===
submission.initialResponse?.content?.[0]?.['text']?.value || submission.initialResponse?.content?.[0]?.['text']?.value ||
!!responseMessage?.content?.[0]?.['tool_call']?.auth; !!responseMessage?.content?.[0]?.['tool_call']?.auth;
/** Handle edge case where stream is cancelled before any response, which creates a blank page */
if (!conversation.conversationId && hasNoResponse) {
const currentConvoId =
(submissionConvo.conversationId ?? conversation.conversationId) || Constants.NEW_CONVO;
if (isNewConvo && submissionConvo.conversationId) {
removeConvoFromAllQueries(queryClient, submissionConvo.conversationId);
}
const isNewChat =
location.pathname === `/c/${Constants.NEW_CONVO}` &&
currentConvoId === Constants.NEW_CONVO;
setFinalMessages(currentConvoId, isNewChat ? [] : [...messages]);
setDraft({ id: currentConvoId, value: requestMessage?.text });
if (isNewChat) {
navigate(`/c/${Constants.NEW_CONVO}`, { replace: true, state: { focusChat: true } });
}
return;
}
/* Update messages; if assistants endpoint, client doesn't receive responseMessage */
let finalMessages: TMessage[] = [];
if (runMessages) {
finalMessages = [...runMessages];
} else if (isRegenerate && responseMessage) {
finalMessages = [...messages, responseMessage];
} else if (requestMessage != null && responseMessage != null) {
finalMessages = [...messages, requestMessage, responseMessage];
}
if (finalMessages.length > 0) {
setFinalMessages(conversation.conversationId, finalMessages);
} else if (
isAssistantsEndpoint(submissionConvo.endpoint) &&
(!submissionConvo.conversationId ||
submissionConvo.conversationId === Constants.NEW_CONVO)
) {
queryClient.setQueryData<TMessage[]>(
[QueryKeys.messages, conversation.conversationId],
[...currentMessages],
);
}
/** Handle edge case where stream is cancelled before any response, which creates a blank page */
if (!conversation.conversationId && hasNoResponse) {
const currentConvoId =
(submissionConvo.conversationId ?? conversation.conversationId) || Constants.NEW_CONVO;
if (isNewConvo && submissionConvo.conversationId) { if (isNewConvo && submissionConvo.conversationId) {
removeConvoFromAllQueries(queryClient, submissionConvo.conversationId); removeConvoFromAllQueries(queryClient, submissionConvo.conversationId);
} }
const isNewChat = /* Refresh title */
location.pathname === `/c/${Constants.NEW_CONVO}` && if (
currentConvoId === Constants.NEW_CONVO; genTitle &&
isNewConvo &&
setFinalMessages(currentConvoId, isNewChat ? [] : [...messages]); !isTemporary &&
setDraft({ id: currentConvoId, value: requestMessage?.text }); requestMessage &&
setIsSubmitting(false); requestMessage.parentMessageId === Constants.NO_PARENT
if (isNewChat) { ) {
navigate(`/c/${Constants.NEW_CONVO}`, { replace: true, state: { focusChat: true } }); setTimeout(() => {
genTitle.mutate({ conversationId: conversation.conversationId as string });
}, 2500);
} }
return;
}
/* Update messages; if assistants endpoint, client doesn't receive responseMessage */ if (setConversation && isAddedRequest !== true) {
let finalMessages: TMessage[] = []; setConversation((prevState) => {
if (runMessages) { const update = {
finalMessages = [...runMessages]; ...prevState,
} else if (isRegenerate && responseMessage) { ...(conversation as TConversation),
finalMessages = [...messages, responseMessage]; };
} else if (requestMessage != null && responseMessage != null) { if (prevState?.model != null && prevState.model !== submissionConvo.model) {
finalMessages = [...messages, requestMessage, responseMessage]; update.model = prevState.model;
} }
if (finalMessages.length > 0) { const cachedConvo = queryClient.getQueryData<TConversation>([
setFinalMessages(conversation.conversationId, finalMessages); QueryKeys.conversation,
} else if ( conversation.conversationId,
isAssistantsEndpoint(submissionConvo.endpoint) && ]);
(!submissionConvo.conversationId || submissionConvo.conversationId === Constants.NEW_CONVO) if (!cachedConvo) {
) { queryClient.setQueryData(
queryClient.setQueryData<TMessage[]>( [QueryKeys.conversation, conversation.conversationId],
[QueryKeys.messages, conversation.conversationId], update,
[...currentMessages], );
); }
} return update;
if (isNewConvo && submissionConvo.conversationId) {
removeConvoFromAllQueries(queryClient, submissionConvo.conversationId);
}
/* Refresh title */
if (
genTitle &&
isNewConvo &&
!isTemporary &&
requestMessage &&
requestMessage.parentMessageId === Constants.NO_PARENT
) {
setTimeout(() => {
genTitle.mutate({ conversationId: conversation.conversationId as string });
}, 2500);
}
if (setConversation && isAddedRequest !== true) {
setConversation((prevState) => {
const update = {
...prevState,
...(conversation as TConversation),
};
if (prevState?.model != null && prevState.model !== submissionConvo.model) {
update.model = prevState.model;
}
const cachedConvo = queryClient.getQueryData<TConversation>([
QueryKeys.conversation,
conversation.conversationId,
]);
if (!cachedConvo) {
queryClient.setQueryData([QueryKeys.conversation, conversation.conversationId], update);
}
return update;
});
if (conversation.conversationId && submission.ephemeralAgent) {
applyAgentTemplate({
targetId: conversation.conversationId,
sourceId: submissionConvo.conversationId,
ephemeralAgent: submission.ephemeralAgent,
specName: submission.conversation?.spec,
startupConfig: queryClient.getQueryData<TStartupConfig>([QueryKeys.startupConfig]),
}); });
}
if (location.pathname === `/c/${Constants.NEW_CONVO}`) { if (conversation.conversationId && submission.ephemeralAgent) {
navigate(`/c/${conversation.conversationId}`, { replace: true }); applyAgentTemplate({
targetId: conversation.conversationId,
sourceId: submissionConvo.conversationId,
ephemeralAgent: submission.ephemeralAgent,
specName: submission.conversation?.spec,
startupConfig: queryClient.getQueryData<TStartupConfig>([QueryKeys.startupConfig]),
});
}
if (location.pathname === `/c/${Constants.NEW_CONVO}`) {
navigate(`/c/${conversation.conversationId}`, { replace: true });
}
} }
} finally {
setShowStopButton(false);
setIsSubmitting(false);
} }
setIsSubmitting(false);
}, },
[ [
navigate, navigate,
@ -722,26 +728,37 @@ export default function useEventHandlers({
messages[messages.length - 2] != null messages[messages.length - 2] != null
) { ) {
let requestMessage = messages[messages.length - 2]; let requestMessage = messages[messages.length - 2];
const responseMessage = messages[messages.length - 1]; const _responseMessage = messages[messages.length - 1];
if (requestMessage.messageId !== responseMessage.parentMessageId) { if (requestMessage.messageId !== _responseMessage.parentMessageId) {
// the request message is the parent of response, which we search for backwards // the request message is the parent of response, which we search for backwards
for (let i = messages.length - 3; i >= 0; i--) { for (let i = messages.length - 3; i >= 0; i--) {
if (messages[i].messageId === responseMessage.parentMessageId) { if (messages[i].messageId === _responseMessage.parentMessageId) {
requestMessage = messages[i]; requestMessage = messages[i];
break; break;
} }
} }
} }
finalHandler( /** Sanitize content array to remove undefined parts from interrupted streaming */
{ const responseMessage = {
conversation: { ..._responseMessage,
conversationId, content: _responseMessage.content?.filter((part) => part != null),
};
try {
finalHandler(
{
conversation: {
conversationId,
},
requestMessage,
responseMessage,
}, },
requestMessage, submission,
responseMessage, );
}, } catch (error) {
submission, console.error('Error in finalHandler during abort:', error);
); setShowStopButton(false);
setIsSubmitting(false);
}
return; return;
} else if (!isAssistantsEndpoint(endpoint)) { } else if (!isAssistantsEndpoint(endpoint)) {
const convoId = conversationId || `_${v4()}`; const convoId = conversationId || `_${v4()}`;
@ -809,13 +826,14 @@ export default function useEventHandlers({
} }
}, },
[ [
finalHandler,
newConversation,
setIsSubmitting,
token, token,
cancelHandler,
getMessages, getMessages,
setMessages, setMessages,
finalHandler,
cancelHandler,
newConversation,
setIsSubmitting,
setShowStopButton,
], ],
); );

View file

@ -124,7 +124,13 @@ export default function useSSE(
if (data.final != null) { if (data.final != null) {
clearDraft(submission.conversation?.conversationId); clearDraft(submission.conversation?.conversationId);
const { plugins } = data; const { plugins } = data;
finalHandler(data, { ...submission, plugins } as EventSubmission); try {
finalHandler(data, { ...submission, plugins } as EventSubmission);
} catch (error) {
console.error('Error in finalHandler:', error);
setIsSubmitting(false);
setShowStopButton(false);
}
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
console.log('final', data); console.log('final', data);
return; return;
@ -187,14 +193,20 @@ export default function useSSE(
setCompleted((prev) => new Set(prev.add(streamKey))); setCompleted((prev) => new Set(prev.add(streamKey)));
const latestMessages = getMessages(); const latestMessages = getMessages();
const conversationId = latestMessages?.[latestMessages.length - 1]?.conversationId; const conversationId = latestMessages?.[latestMessages.length - 1]?.conversationId;
return await abortConversation( try {
conversationId ?? await abortConversation(
userMessage.conversationId ?? conversationId ??
submission.conversation?.conversationId ?? userMessage.conversationId ??
'', submission.conversation?.conversationId ??
submission as EventSubmission, '',
latestMessages, submission as EventSubmission,
); latestMessages,
);
} catch (error) {
console.error('Error during abort:', error);
setIsSubmitting(false);
setShowStopButton(false);
}
}); });
sse.addEventListener('error', async (e: MessageEvent) => { sse.addEventListener('error', async (e: MessageEvent) => {

View file

@ -313,6 +313,10 @@ export default function useStepHandler({
? messageDelta.delta.content[0] ? messageDelta.delta.content[0]
: messageDelta.delta.content; : messageDelta.delta.content;
if (contentPart == null) {
return;
}
const currentIndex = calculateContentIndex( const currentIndex = calculateContentIndex(
runStep.index, runStep.index,
initialContent, initialContent,
@ -345,6 +349,10 @@ export default function useStepHandler({
? reasoningDelta.delta.content[0] ? reasoningDelta.delta.content[0]
: reasoningDelta.delta.content; : reasoningDelta.delta.content;
if (contentPart == null) {
return;
}
const currentIndex = calculateContentIndex( const currentIndex = calculateContentIndex(
runStep.index, runStep.index,
initialContent, initialContent,

View file

@ -44,7 +44,7 @@ export const getAllContentText = (message?: TMessage | null): string => {
if (message.content && message.content.length > 0) { if (message.content && message.content.length > 0) {
return message.content return message.content
.filter((part) => part.type === ContentTypes.TEXT) .filter((part) => part != null && part.type === ContentTypes.TEXT)
.map((part) => { .map((part) => {
if (!('text' in part)) return ''; if (!('text' in part)) return '';
const text = part.text; const text = part.text;