🌿 feat: Multi-response Streaming (#3191)

* chore: comment back handlePlusCommand

* chore: ignore .git dir

* refactor: pass newConversation to `useSelectMention`

refactor: pass newConversation to Mention component

refactor: useChatFunctions for modular use of `ask` and `regenerate`

refactor: set latest message only for the first index in useChatFunctions

refactor: pass setLatestMessage to useChatFunctions

refactor: Pass setSubmission to useChatFunctions for submission handling

refactor: consolidate event handlers to separate hook from useSSE

WIP: additional response handlers

feat: responsive added convo, clears on new chat/navigating to chat, assistants excluded

feat: Add conversationByKeySelector to select any conversation by index

WIP: handle second submission with messages paired to root

* style: surface-primary-contrast

* refactor: remove unnecessary console.log statement in useChatFunctions

* refactor: Consolidate imports in ChatForm and Input hooks

* refactor: compositional usage of useSSE for multiple streams

* WIP: set latest 'multi' message

* WIP: first pass, added response streaming

* pass: performant multi-message stream

* fix: styling and message render

* second pass: modular, performant multi-stream

* fix: align parentMessageId of multiMessage

* refactor: move resetting latestMultiMessage

* chore: update footer text in Chat component

* fix: stop button styling

* fix: handle abortMessage request for multi-response

* clear messages but bug with latest message reset present

* fix: add delay for additional message generation

* fix: access LAST_CONVO_SETUP by index

* style: add div to prevent layout shift before hover buttons render

* chore: Update Message component styling for card messages

* chore: move hook use order

* fix: abort middleware using unsent field from req.body

* feat: support multi-response stream from initial message

* refactor: buildTree function to improve readability and remove unused code

* feat: add logger for frontend dev

* refactor: use depth to track if message is really last in its branch

* fix(buildTree): default export

* fix: share parent message Id and avoid duplication error for multi-response streams

* fix: prevent addedConvo reset to response convo

* feat: allow setting multi message as latest message to control which to respond to

* chore: wrap setSiblingIdxRev with useCallback

* chore: styling and allow editing messages

* style: styling fixes

* feat: Add "AddMultiConvo" component to Chat Header

* feat: prevent clearing added convos on endpoint, preset, mention, or modelSpec switch

* fix: message styling fixes, mainly related to code blocks

* fix: stop button visibility logic

* fix: Handle edge case in abortMiddleware for non-existant `abortControllers`

* refactor: optimize/memoize icons

* chore(GoogleClient): change info to debug logs

* style: active message styling

* style: prevent layout shift due to placeholder row

* chore: remove unused code

* fix: Update BaseClient to handle optional request body properties

* fix(ci): `onStart` now accepts 2 args, the 2nd being responseMessageId

* chore: bump data-provider
This commit is contained in:
Danny Avila 2024-06-25 03:02:38 -04:00 committed by GitHub
parent eef894e608
commit 156c52e293
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
72 changed files with 2697 additions and 1326 deletions

View file

@ -1,72 +1,44 @@
import { v4 } from 'uuid';
import { useSetRecoilState } from 'recoil';
import { useParams } from 'react-router-dom';
import { useQueryClient } from '@tanstack/react-query';
import { useEffect, useState, useCallback } from 'react';
import { useEffect, useState } from 'react';
import {
/* @ts-ignore */
SSE,
QueryKeys,
Constants,
EndpointURLs,
createPayload,
tPresetSchema,
tMessageSchema,
LocalStorageKeys,
tConvoUpdateSchema,
removeNullishValues,
isAssistantsEndpoint,
} from 'librechat-data-provider';
import { useGetUserBalance, useGetStartupConfig } from 'librechat-data-provider/react-query';
import type {
TResPlugin,
TMessage,
TConversation,
TSubmission,
ConversationData,
} from 'librechat-data-provider';
import {
addConversation,
deleteConversation,
updateConversation,
getConversationById,
} from '~/utils';
import type { TSubmission } from 'librechat-data-provider';
import type { EventHandlerParams } from './useEventHandlers';
import type { TResData } from '~/common';
import { useGenTitleMutation } from '~/data-provider';
import useContentHandler from './useContentHandler';
import { useAuthContext } from '../AuthContext';
import useChatHelpers from '../useChatHelpers';
import { useAuthContext } from '~/hooks/AuthContext';
import useEventHandlers from './useEventHandlers';
import store from '~/store';
type TResData = {
plugin?: TResPlugin;
final?: boolean;
initial?: boolean;
previousMessages?: TMessage[];
requestMessage: TMessage;
responseMessage: TMessage;
conversation: TConversation;
conversationId?: string;
runMessages?: TMessage[];
};
type ChatHelpers = Pick<
EventHandlerParams,
| 'setMessages'
| 'getMessages'
| 'setConversation'
| 'setIsSubmitting'
| 'newConversation'
| 'resetLatestMessage'
>;
type TSyncData = {
sync: boolean;
thread_id: string;
messages?: TMessage[];
requestMessage: TMessage;
responseMessage: TMessage;
conversationId: string;
};
export default function useSSE(submission: TSubmission | null, index = 0) {
const queryClient = useQueryClient();
export default function useSSE(
submission: TSubmission | null,
chatHelpers: ChatHelpers,
isAddedRequest = false,
runIndex = 0,
) {
const genTitle = useGenTitleMutation();
const setActiveRunId = useSetRecoilState(store.activeRunFamily(index));
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
const { conversationId: paramId } = useParams();
const { token, isAuthenticated } = useAuthContext();
const [completed, setCompleted] = useState(new Set());
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(index));
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
const {
setMessages,
@ -75,435 +47,34 @@ export default function useSSE(submission: TSubmission | null, index = 0) {
setIsSubmitting,
newConversation,
resetLatestMessage,
} = useChatHelpers(index, paramId);
const contentHandler = useContentHandler({ setMessages, getMessages });
} = chatHelpers;
const {
syncHandler,
finalHandler,
errorHandler,
messageHandler,
contentHandler,
createdHandler,
abortConversation,
} = useEventHandlers({
genTitle,
setMessages,
getMessages,
setCompleted,
isAddedRequest,
setConversation,
setIsSubmitting,
newConversation,
setShowStopButton,
resetLatestMessage,
});
const { data: startupConfig } = useGetStartupConfig();
const balanceQuery = useGetUserBalance({
enabled: !!isAuthenticated && startupConfig?.checkBalance,
});
const messageHandler = useCallback(
(data: string, submission: TSubmission) => {
const {
messages,
userMessage,
plugin,
plugins,
initialResponse,
isRegenerate = false,
} = submission;
if (isRegenerate) {
setMessages([
...messages,
{
...initialResponse,
text: data,
plugin: plugin ?? null,
plugins: plugins ?? [],
// unfinished: true
},
]);
} else {
setMessages([
...messages,
userMessage,
{
...initialResponse,
text: data,
plugin: plugin ?? null,
plugins: plugins ?? [],
// unfinished: true
},
]);
}
},
[setMessages],
);
const cancelHandler = useCallback(
(data: TResData, submission: TSubmission) => {
const { requestMessage, responseMessage, conversation } = data;
const { messages, isRegenerate = false } = submission;
const convoUpdate = conversation ?? submission.conversation;
// update the messages
if (isRegenerate) {
const messagesUpdate = [...messages, responseMessage].filter((msg) => msg);
setMessages(messagesUpdate);
} else {
const messagesUpdate = [...messages, requestMessage, responseMessage].filter((msg) => msg);
setMessages(messagesUpdate);
}
const isNewConvo = conversation.conversationId !== submission.conversation.conversationId;
if (isNewConvo) {
queryClient.setQueryData<ConversationData>([QueryKeys.allConversations], (convoData) => {
if (!convoData) {
return convoData;
}
return deleteConversation(convoData, submission.conversation.conversationId as string);
});
}
// refresh title
if (isNewConvo && requestMessage?.parentMessageId === Constants.NO_PARENT) {
setTimeout(() => {
genTitle.mutate({ conversationId: convoUpdate.conversationId as string });
}, 2500);
}
setConversation((prevState) => {
const update = {
...prevState,
...convoUpdate,
};
return update;
});
setIsSubmitting(false);
},
[setMessages, setConversation, genTitle, queryClient, setIsSubmitting],
);
const syncHandler = useCallback(
(data: TSyncData, submission: TSubmission) => {
const { conversationId, thread_id, responseMessage, requestMessage } = data;
const { initialResponse, messages: _messages, userMessage } = submission;
const messages = _messages.filter((msg) => msg.messageId !== userMessage.messageId);
setMessages([
...messages,
requestMessage,
{
...initialResponse,
...responseMessage,
},
]);
let update = {} as TConversation;
setConversation((prevState) => {
let title = prevState?.title;
const parentId = requestMessage.parentMessageId;
if (parentId !== Constants.NO_PARENT && title?.toLowerCase()?.includes('new chat')) {
const convos = queryClient.getQueryData<ConversationData>([QueryKeys.allConversations]);
const cachedConvo = getConversationById(convos, conversationId);
title = cachedConvo?.title;
}
update = tConvoUpdateSchema.parse({
...prevState,
conversationId,
thread_id,
title,
messages: [requestMessage.messageId, responseMessage.messageId],
}) as TConversation;
return update;
});
queryClient.setQueryData<ConversationData>([QueryKeys.allConversations], (convoData) => {
if (!convoData) {
return convoData;
}
if (requestMessage.parentMessageId === Constants.NO_PARENT) {
return addConversation(convoData, update);
} else {
return updateConversation(convoData, update);
}
});
setShowStopButton(true);
resetLatestMessage();
},
[setMessages, setConversation, queryClient, setShowStopButton, resetLatestMessage],
);
const createdHandler = useCallback(
(data: TResData, submission: TSubmission) => {
const { messages, userMessage, isRegenerate = false } = submission;
const initialResponse = {
...submission.initialResponse,
parentMessageId: userMessage?.messageId,
messageId: userMessage?.messageId + '_',
};
if (isRegenerate) {
setMessages([...messages, initialResponse]);
} else {
setMessages([...messages, userMessage, initialResponse]);
}
const { conversationId, parentMessageId } = userMessage;
let update = {} as TConversation;
setConversation((prevState) => {
let title = prevState?.title;
const parentId = isRegenerate ? userMessage?.overrideParentMessageId : parentMessageId;
if (parentId !== Constants.NO_PARENT && title?.toLowerCase()?.includes('new chat')) {
const convos = queryClient.getQueryData<ConversationData>([QueryKeys.allConversations]);
const cachedConvo = getConversationById(convos, conversationId);
title = cachedConvo?.title;
}
update = tConvoUpdateSchema.parse({
...prevState,
conversationId,
title,
}) as TConversation;
return update;
});
queryClient.setQueryData<ConversationData>([QueryKeys.allConversations], (convoData) => {
if (!convoData) {
return convoData;
}
if (parentMessageId === Constants.NO_PARENT) {
return addConversation(convoData, update);
} else {
return updateConversation(convoData, update);
}
});
resetLatestMessage();
},
[setMessages, setConversation, queryClient, resetLatestMessage],
);
const finalHandler = useCallback(
(data: TResData, submission: TSubmission) => {
const { requestMessage, responseMessage, conversation, runMessages } = data;
const { messages, conversation: submissionConvo, isRegenerate = false } = submission;
setShowStopButton(false);
setCompleted((prev) => new Set(prev.add(submission?.initialResponse?.messageId)));
const currentMessages = getMessages();
// Early return if messages are empty; i.e., the user navigated away
if (!currentMessages?.length) {
return setIsSubmitting(false);
}
// update the messages; if assistants endpoint, client doesn't receive responseMessage
if (runMessages) {
setMessages([...runMessages]);
} else if (isRegenerate && responseMessage) {
setMessages([...messages, responseMessage]);
} else if (responseMessage) {
setMessages([...messages, requestMessage, responseMessage]);
}
const isNewConvo = conversation.conversationId !== submissionConvo.conversationId;
if (isNewConvo) {
queryClient.setQueryData<ConversationData>([QueryKeys.allConversations], (convoData) => {
if (!convoData) {
return convoData;
}
return deleteConversation(convoData, submissionConvo.conversationId as string);
});
}
// refresh title
if (isNewConvo && requestMessage && requestMessage.parentMessageId === Constants.NO_PARENT) {
setTimeout(() => {
genTitle.mutate({ conversationId: conversation.conversationId as string });
}, 2500);
}
setConversation((prevState) => {
const update = {
...prevState,
...conversation,
};
if (prevState?.model && prevState.model !== submissionConvo.model) {
update.model = prevState.model;
}
return update;
});
setIsSubmitting(false);
},
[
genTitle,
queryClient,
getMessages,
setMessages,
setConversation,
setIsSubmitting,
setShowStopButton,
],
);
const errorHandler = useCallback(
({ data, submission }: { data?: TResData; submission: TSubmission }) => {
const { messages, userMessage, initialResponse } = submission;
setCompleted((prev) => new Set(prev.add(initialResponse.messageId)));
const conversationId = userMessage?.conversationId ?? submission?.conversationId;
const parseErrorResponse = (data: TResData | Partial<TMessage>) => {
const metadata = data['responseMessage'] ?? data;
const errorMessage = {
...initialResponse,
...metadata,
error: true,
parentMessageId: userMessage?.messageId,
};
if (!errorMessage.messageId) {
errorMessage.messageId = v4();
}
return tMessageSchema.parse(errorMessage);
};
if (!data) {
const convoId = conversationId ?? v4();
const errorResponse = parseErrorResponse({
text: 'Error connecting to server, try refreshing the page.',
...submission,
conversationId: convoId,
});
setMessages([...messages, userMessage, errorResponse]);
newConversation({
template: { conversationId: convoId },
preset: tPresetSchema.parse(submission?.conversation),
});
setIsSubmitting(false);
return;
}
if (!conversationId && !data.conversationId) {
const convoId = v4();
const errorResponse = parseErrorResponse(data);
setMessages([...messages, userMessage, errorResponse]);
newConversation({
template: { conversationId: convoId },
preset: tPresetSchema.parse(submission?.conversation),
});
setIsSubmitting(false);
return;
} else if (!data.conversationId) {
const errorResponse = parseErrorResponse(data);
setMessages([...messages, userMessage, errorResponse]);
setIsSubmitting(false);
return;
}
console.log('Error:', data);
const errorResponse = tMessageSchema.parse({
...data,
error: true,
parentMessageId: userMessage?.messageId,
});
setMessages([...messages, userMessage, errorResponse]);
if (data.conversationId && paramId === 'new') {
newConversation({
template: { conversationId: data.conversationId },
preset: tPresetSchema.parse(submission?.conversation),
});
}
setIsSubmitting(false);
return;
},
[setMessages, paramId, setIsSubmitting, newConversation],
);
const abortConversation = useCallback(
async (conversationId = '', submission: TSubmission) => {
let runAbortKey = '';
try {
const conversation = (JSON.parse(
localStorage.getItem(LocalStorageKeys.LAST_CONVO_SETUP) ?? '',
) ?? {}) as TConversation;
const { conversationId, messages } = conversation;
runAbortKey = `${conversationId}:${messages?.[messages.length - 1]}`;
} catch (error) {
console.error('Error getting last conversation setup');
console.error(error);
}
const { endpoint: _endpoint, endpointType } = submission?.conversation || {};
const endpoint = endpointType ?? _endpoint;
try {
const response = await fetch(`${EndpointURLs[endpoint ?? '']}/abort`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify({
abortKey: isAssistantsEndpoint(_endpoint) ? runAbortKey : conversationId,
endpoint,
}),
});
// Check if the response is JSON
const contentType = response.headers.get('content-type');
if (contentType && contentType.includes('application/json')) {
const data = await response.json();
console.log('aborted', data);
if (response.status === 404) {
setIsSubmitting(false);
return;
}
if (data.final) {
finalHandler(data, submission);
} else {
cancelHandler(data, submission);
}
} else if (response.status === 204) {
const responseMessage = {
...submission.initialResponse,
};
const data = {
requestMessage: submission.userMessage,
responseMessage: responseMessage,
conversation: submission.conversation,
};
console.log('aborted', data);
} else {
throw new Error(
'Unexpected response from server; Status: ' +
response.status +
' ' +
response.statusText,
);
}
} catch (error) {
console.error('Error cancelling request');
console.error(error);
const convoId = conversationId ?? v4();
const text =
submission.initialResponse?.text?.length > 45 ? submission.initialResponse?.text : '';
const errorMessage = {
...submission,
...submission.initialResponse,
text: text ?? (error as Error).message ?? 'Error cancelling request',
unfinished: !!text.length,
error: true,
};
const errorResponse = tMessageSchema.parse(errorMessage);
setMessages([...submission.messages, submission.userMessage, errorResponse]);
newConversation({
template: { conversationId: convoId },
preset: tPresetSchema.parse(submission?.conversation),
});
setIsSubmitting(false);
}
},
[token, setIsSubmitting, finalHandler, cancelHandler, setMessages, newConversation],
);
useEffect(() => {
if (submission === null || Object.keys(submission).length === 0) {
return;
@ -571,16 +142,6 @@ export default function useSSE(submission: TSubmission | null, index = 0) {
}
};
// events.onaudio = (e: MessageEvent) => {
// const data = JSON.parse(e.data);
// console.log('audio', data);
// if (data.audio) {
// audioSource.addBase64Data(data.audio);
// }
// };
// events.onend = () => audioSource.close();
events.onopen = () => console.log('connection is opened');
events.oncancel = async () => {
@ -595,9 +156,12 @@ export default function useSSE(submission: TSubmission | null, index = 0) {
}
setCompleted((prev) => new Set(prev.add(streamKey)));
const latestMessages = getMessages();
const conversationId = latestMessages?.[latestMessages?.length - 1]?.conversationId;
return await abortConversation(
userMessage?.conversationId ?? submission?.conversationId,
conversationId ?? userMessage?.conversationId ?? submission?.conversationId,
submission,
latestMessages,
);
};