import { v4 } from 'uuid'; import { useSetRecoilState } from 'recoil'; import { useEffect, useState } from 'react'; import { /* @ts-ignore */ SSE, createPayload, isAgentsEndpoint, removeNullishValues, isAssistantsEndpoint, } from 'librechat-data-provider'; import { useGetUserBalance, useGetStartupConfig } from 'librechat-data-provider/react-query'; import type { TMessage, TSubmission, TPayload, EventSubmission } from 'librechat-data-provider'; import type { EventHandlerParams } from './useEventHandlers'; import type { TResData } from '~/common'; import { useGenTitleMutation } from '~/data-provider'; import { useAuthContext } from '~/hooks/AuthContext'; import useEventHandlers from './useEventHandlers'; import store from '~/store'; type ChatHelpers = Pick< EventHandlerParams, | 'setMessages' | 'getMessages' | 'setConversation' | 'setIsSubmitting' | 'newConversation' | 'resetLatestMessage' >; export default function useSSE( submission: TSubmission | null, chatHelpers: ChatHelpers, isAddedRequest = false, runIndex = 0, ) { const genTitle = useGenTitleMutation(); const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); const { token, isAuthenticated } = useAuthContext(); const [completed, setCompleted] = useState(new Set()); const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); const { setMessages, getMessages, setConversation, setIsSubmitting, newConversation, resetLatestMessage, } = chatHelpers; const { stepHandler, syncHandler, finalHandler, errorHandler, messageHandler, contentHandler, createdHandler, attachmentHandler, abortConversation, } = useEventHandlers({ genTitle, setMessages, getMessages, setCompleted, isAddedRequest, setConversation, setIsSubmitting, newConversation, setShowStopButton, resetLatestMessage, }); const { data: startupConfig } = useGetStartupConfig(); const balanceQuery = useGetUserBalance({ enabled: !!isAuthenticated && startupConfig?.checkBalance, }); useEffect(() => { if (submission === null || Object.keys(submission).length === 0) { return; } let { userMessage } = submission; const payloadData = createPayload(submission); let { payload } = payloadData; if (isAssistantsEndpoint(payload.endpoint) || isAgentsEndpoint(payload.endpoint)) { payload = removeNullishValues(payload) as TPayload; } let textIndex = null; const events = new SSE(payloadData.server, { payload: JSON.stringify(payload), headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, }); events.onattachment = (e: MessageEvent) => { try { const data = JSON.parse(e.data); attachmentHandler({ data, submission: submission as EventSubmission }); } catch (error) { console.error(error); } }; events.onmessage = (e: MessageEvent) => { const data = JSON.parse(e.data); if (data.final != null) { const { plugins } = data; finalHandler(data, { ...submission, plugins } as EventSubmission); (startupConfig?.checkBalance ?? false) && balanceQuery.refetch(); console.log('final', data); return; } else if (data.created != null) { const runId = v4(); setActiveRunId(runId); userMessage = { ...userMessage, ...data.message, overrideParentMessageId: userMessage.overrideParentMessageId, }; createdHandler(data, { ...submission, userMessage } as EventSubmission); } else if (data.event != null) { stepHandler(data, { ...submission, userMessage } as EventSubmission); } else if (data.sync != null) { const runId = v4(); setActiveRunId(runId); /* synchronize messages to Assistants API as well as with real DB ID's */ syncHandler(data, { ...submission, userMessage } as EventSubmission); } else if (data.type != null) { const { text, index } = data; if (text != null && index !== textIndex) { textIndex = index; } contentHandler({ data, submission: submission as EventSubmission }); } else { const text = data.text ?? data.response; const { plugin, plugins } = data; const initialResponse = { ...(submission.initialResponse as TMessage), parentMessageId: data.parentMessageId, messageId: data.messageId, }; if (data.message != null) { messageHandler(text, { ...submission, plugin, plugins, userMessage, initialResponse }); } } }; events.onopen = () => { setAbortScroll(false); console.log('connection is opened'); }; events.oncancel = async () => { const streamKey = (submission as TSubmission | null)?.['initialResponse']?.messageId; if (completed.has(streamKey)) { setIsSubmitting(false); setCompleted((prev) => { prev.delete(streamKey); return new Set(prev); }); return; } setCompleted((prev) => new Set(prev.add(streamKey))); const latestMessages = getMessages(); const conversationId = latestMessages?.[latestMessages.length - 1]?.conversationId; return await abortConversation( conversationId ?? userMessage.conversationId ?? submission.conversationId, submission as EventSubmission, latestMessages, ); }; events.onerror = function (e: MessageEvent) { console.log('error in server stream.'); (startupConfig?.checkBalance ?? false) && balanceQuery.refetch(); let data: TResData | undefined = undefined; try { data = JSON.parse(e.data) as TResData; } catch (error) { console.error(error); console.log(e); setIsSubmitting(false); } errorHandler({ data, submission: { ...submission, userMessage } as EventSubmission }); }; setIsSubmitting(true); events.stream(); return () => { const isCancelled = events.readyState <= 1; events.close(); // setSource(null); if (isCancelled) { const e = new Event('cancel'); events.dispatchEvent(e); } }; // eslint-disable-next-line react-hooks/exhaustive-deps }, [submission]); }