diff --git a/client/package.json b/client/package.json index 3910f7bed..fa78185a9 100644 --- a/client/package.json +++ b/client/package.json @@ -96,6 +96,7 @@ "remark-gfm": "^4.0.0", "remark-math": "^6.0.0", "remark-supersub": "^1.0.0", + "sse.js": "^2.5.0", "tailwind-merge": "^1.9.1", "tailwindcss-animate": "^1.0.5", "tailwindcss-radix": "^2.8.0", diff --git a/client/src/hooks/SSE/useSSE.ts b/client/src/hooks/SSE/useSSE.ts index 1c3e51d10..7d0a63757 100644 --- a/client/src/hooks/SSE/useSSE.ts +++ b/client/src/hooks/SSE/useSSE.ts @@ -1,22 +1,23 @@ -import { v4 } from 'uuid'; -import { useSetRecoilState } from 'recoil'; -import { useEffect, useState } from 'react'; +import type { EventSubmission, TMessage, TPayload, TSubmission } from 'librechat-data-provider'; import { /* @ts-ignore */ - SSE, createPayload, isAgentsEndpoint, - removeNullishValues, isAssistantsEndpoint, + removeNullishValues, + request, } from 'librechat-data-provider'; -import { useGetUserBalance, useGetStartupConfig } from 'librechat-data-provider/react-query'; -import type { TMessage, TSubmission, TPayload, EventSubmission } from 'librechat-data-provider'; -import type { EventHandlerParams } from './useEventHandlers'; +import { useGetStartupConfig, useGetUserBalance } from 'librechat-data-provider/react-query'; +import { useEffect, useState } from 'react'; +import { useSetRecoilState } from 'recoil'; +import { SSE } from 'sse.js'; +import { v4 } from 'uuid'; import type { TResData } from '~/common'; import { useGenTitleMutation } from '~/data-provider'; import { useAuthContext } from '~/hooks/AuthContext'; -import useEventHandlers from './useEventHandlers'; import store from '~/store'; +import type { EventHandlerParams } from './useEventHandlers'; +import useEventHandlers from './useEventHandlers'; type ChatHelpers = Pick< EventHandlerParams, @@ -94,21 +95,21 @@ export default function useSSE( let textIndex = null; - const events = new SSE(payloadData.server, { + const sse = new SSE(payloadData.server, { payload: JSON.stringify(payload), headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` }, }); - events.onattachment = (e: MessageEvent) => { + sse.addEventListener('attachment', (e: MessageEvent) => { try { const data = JSON.parse(e.data); attachmentHandler({ data, submission: submission as EventSubmission }); } catch (error) { console.error(error); } - }; + }); - events.onmessage = (e: MessageEvent) => { + sse.addEventListener('message', (e: MessageEvent) => { const data = JSON.parse(e.data); if (data.final != null) { @@ -155,14 +156,14 @@ export default function useSSE( messageHandler(text, { ...submission, plugin, plugins, userMessage, initialResponse }); } } - }; + }); - events.onopen = () => { + sse.addEventListener('open', () => { setAbortScroll(false); console.log('connection is opened'); - }; + }); - events.oncancel = async () => { + sse.addEventListener('cancel', async () => { const streamKey = (submission as TSubmission | null)?.['initialResponse']?.messageId; if (completed.has(streamKey)) { setIsSubmitting(false); @@ -181,9 +182,27 @@ export default function useSSE( submission as EventSubmission, latestMessages, ); - }; + }); + + sse.addEventListener('error', async (e: MessageEvent) => { + /* @ts-ignore */ + if (e.responseCode === 401) { + /* token expired, refresh and retry */ + try { + const refreshResponse = await request.refreshToken(); + sse.headers = { + 'Content-Type': 'application/json', + Authorization: `Bearer ${refreshResponse.token}`, + }; + request.dispatchTokenUpdatedEvent(refreshResponse.token); + sse.stream(); + return; + } catch (error) { + /* token refresh failed, continue handling the original 401 */ + console.log(error); + } + } - events.onerror = function (e: MessageEvent) { console.log('error in server stream.'); (startupConfig?.checkBalance ?? false) && balanceQuery.refetch(); @@ -197,18 +216,18 @@ export default function useSSE( } errorHandler({ data, submission: { ...submission, userMessage } as EventSubmission }); - }; + }); setIsSubmitting(true); - events.stream(); + sse.stream(); return () => { - const isCancelled = events.readyState <= 1; - events.close(); - // setSource(null); + const isCancelled = sse.readyState <= 1; + sse.close(); if (isCancelled) { const e = new Event('cancel'); - events.dispatchEvent(e); + /* @ts-ignore */ + sse.dispatchEvent(e); } }; // eslint-disable-next-line react-hooks/exhaustive-deps diff --git a/package-lock.json b/package-lock.json index d292fff47..a3fb7bf47 100644 --- a/package-lock.json +++ b/package-lock.json @@ -733,6 +733,7 @@ "remark-gfm": "^4.0.0", "remark-math": "^6.0.0", "remark-supersub": "^1.0.0", + "sse.js": "^2.5.0", "tailwind-merge": "^1.9.1", "tailwindcss-animate": "^1.0.5", "tailwindcss-radix": "^2.8.0", @@ -32747,6 +32748,12 @@ "integrity": "sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==", "dev": true }, + "node_modules/sse.js": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/sse.js/-/sse.js-2.5.0.tgz", + "integrity": "sha512-I7zYndqOOkNpz9KIdFZ8c8A7zs1YazNewBr8Nsi/tqThfJkVPuP1q7UE2h4B0RwoWZxbBYpd06uoW3NI3SaZXg==", + "license": "Apache-2.0" + }, "node_modules/stack-trace": { "version": "0.0.10", "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", diff --git a/packages/data-provider/src/index.ts b/packages/data-provider/src/index.ts index 1fe1c0033..59e0d60b7 100644 --- a/packages/data-provider/src/index.ts +++ b/packages/data-provider/src/index.ts @@ -8,26 +8,25 @@ export * from './artifacts'; /* schema helpers */ export * from './parsers'; /* custom/dynamic configurations */ -export * from './models'; export * from './generate'; +export * from './models'; /* RBAC */ export * from './roles'; /* types (exports schemas from `./types` as they contain needed in other defs) */ export * from './types'; export * from './types/agents'; export * from './types/assistants'; -export * from './types/queries'; export * from './types/files'; export * from './types/mutations'; +export * from './types/queries'; export * from './types/runs'; /* query/mutation keys */ export * from './keys'; /* api call helpers */ export * from './headers-helpers'; export { default as request } from './request'; -import * as dataService from './data-service'; export { dataService }; +import * as dataService from './data-service'; /* general helpers */ -export * from './sse'; export * from './actions'; export { default as createPayload } from './createPayload'; diff --git a/packages/data-provider/src/request.ts b/packages/data-provider/src/request.ts index eaa1896cd..c19d60f78 100644 --- a/packages/data-provider/src/request.ts +++ b/packages/data-provider/src/request.ts @@ -1,7 +1,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import axios, { AxiosRequestConfig, AxiosError } from 'axios'; -import { setTokenHeader } from './headers-helpers'; +import axios, { AxiosError, AxiosRequestConfig } from 'axios'; import * as endpoints from './api-endpoints'; +import { setTokenHeader } from './headers-helpers'; async function _get(url: string, options?: AxiosRequestConfig): Promise { const response = await axios.get(url, { ...options }); @@ -65,6 +65,11 @@ let failedQueue: { resolve: (value?: any) => void; reject: (reason?: any) => voi const refreshToken = (retry?: boolean) => _post(endpoints.refreshToken(retry)); +const dispatchTokenUpdatedEvent = (token: string) => { + setTokenHeader(token); + window.dispatchEvent(new CustomEvent('tokenUpdated', { detail: token })); +}; + const processQueue = (error: AxiosError | null, token: string | null = null) => { failedQueue.forEach((prom) => { if (error) { @@ -109,8 +114,7 @@ axios.interceptors.response.use( if (token) { originalRequest.headers['Authorization'] = 'Bearer ' + token; - setTokenHeader(token); - window.dispatchEvent(new CustomEvent('tokenUpdated', { detail: token })); + dispatchTokenUpdatedEvent(token); processQueue(null, token); return await axios(originalRequest); } else { @@ -139,4 +143,5 @@ export default { deleteWithOptions: _deleteWithOptions, patch: _patch, refreshToken, + dispatchTokenUpdatedEvent, }; diff --git a/packages/data-provider/src/sse.js b/packages/data-provider/src/sse.js deleted file mode 100644 index f3705a450..000000000 --- a/packages/data-provider/src/sse.js +++ /dev/null @@ -1,242 +0,0 @@ -/* eslint-disable */ -/** - * Copyright (C) 2016 Maxime Petazzoni . - * All rights reserved. - */ - -import request from './request'; -import { setTokenHeader } from './headers-helpers'; - -var SSE = function (url, options) { - if (!(this instanceof SSE)) { - return new SSE(url, options); - } - - this.INITIALIZING = -1; - this.CONNECTING = 0; - this.OPEN = 1; - this.CLOSED = 2; - - this.url = url; - - options = options || {}; - this.headers = options.headers || {}; - this.payload = options.payload !== undefined ? options.payload : ''; - this.method = options.method || (this.payload && 'POST') || 'GET'; - this.withCredentials = !!options.withCredentials; - - this.FIELD_SEPARATOR = ':'; - this.listeners = {}; - - this.xhr = null; - this.readyState = this.INITIALIZING; - this.progress = 0; - this.chunk = ''; - - this.addEventListener = function (type, listener) { - if (this.listeners[type] === undefined) { - this.listeners[type] = []; - } - - if (this.listeners[type].indexOf(listener) === -1) { - this.listeners[type].push(listener); - } - }; - - this.removeEventListener = function (type, listener) { - if (this.listeners[type] === undefined) { - return; - } - - var filtered = []; - this.listeners[type].forEach(function (element) { - if (element !== listener) { - filtered.push(element); - } - }); - if (filtered.length === 0) { - delete this.listeners[type]; - } else { - this.listeners[type] = filtered; - } - }; - - this.dispatchEvent = function (e) { - if (!e) { - return true; - } - - e.source = this; - - var onHandler = 'on' + e.type; - if (this.hasOwnProperty(onHandler)) { - this[onHandler].call(this, e); - if (e.defaultPrevented) { - return false; - } - } - - if (this.listeners[e.type]) { - return this.listeners[e.type].every(function (callback) { - callback(e); - return !e.defaultPrevented; - }); - } - - return true; - }; - - this._setReadyState = function (state) { - var event = new CustomEvent('readystatechange'); - event.readyState = state; - this.readyState = state; - this.dispatchEvent(event); - }; - - this._onStreamFailure = function (e) { - var event = new CustomEvent('error'); - event.data = e.currentTarget.response; - this.dispatchEvent(event); - this.close(); - }; - - this._onStreamAbort = function (e) { - this.dispatchEvent(new CustomEvent('abort')); - this.close(); - }; - - this._onStreamProgress = async function (e) { - if (!this.xhr) { - return; - } - - if (this.xhr.status === 401 && !this._retry) { - this._retry = true; - try { - const refreshResponse = await request.refreshToken(); - this.headers = { - 'Content-Type': 'application/json', - Authorization: `Bearer ${refreshResponse.token}`, - }; - setTokenHeader(refreshResponse.token); - window.dispatchEvent(new CustomEvent('tokenUpdated', { detail: refreshResponse.token })); - this.stream(); - } catch (err) { - this._onStreamFailure(e); - return; - } - } else if (this.xhr.status !== 200) { - this._onStreamFailure(e); - return; - } - - if (this.readyState == this.CONNECTING) { - this.dispatchEvent(new CustomEvent('open')); - this._setReadyState(this.OPEN); - } - - var data = this.xhr.responseText.substring(this.progress); - this.progress += data.length; - data.split(/(\r\n|\r|\n){2}/g).forEach( - function (part) { - if (part.trim().length === 0) { - this.dispatchEvent(this._parseEventChunk(this.chunk.trim())); - this.chunk = ''; - } else { - this.chunk += part; - } - }.bind(this), - ); - }; - - this._onStreamLoaded = function (e) { - this._onStreamProgress(e); - - // Parse the last chunk. - this.dispatchEvent(this._parseEventChunk(this.chunk)); - this.chunk = ''; - }; - - /** - * Parse a received SSE event chunk into a constructed event object. - */ - this._parseEventChunk = function (chunk) { - if (!chunk || chunk.length === 0) { - return null; - } - - var e = { id: null, retry: null, data: '', event: 'message' }; - chunk.split(/\n|\r\n|\r/).forEach( - function (line) { - line = line.trimRight(); - var index = line.indexOf(this.FIELD_SEPARATOR); - if (index <= 0) { - // Line was either empty, or started with a separator and is a comment. - // Either way, ignore. - return; - } - - var field = line.substring(0, index); - if (!(field in e)) { - return; - } - - var value = line.substring(index + 1).trimLeft(); - if (field === 'data') { - e[field] += value; - } else { - e[field] = value; - } - }.bind(this), - ); - - var event = new CustomEvent(e.event); - event.data = e.data; - event.id = e.id; - return event; - }; - - this._checkStreamClosed = function () { - if (!this.xhr) { - return; - } - - if (this.xhr.readyState === XMLHttpRequest.DONE) { - this._setReadyState(this.CLOSED); - } - }; - - this.stream = function () { - this._setReadyState(this.CONNECTING); - - this.xhr = new XMLHttpRequest(); - this.xhr.addEventListener('progress', this._onStreamProgress.bind(this)); - this.xhr.addEventListener('load', this._onStreamLoaded.bind(this)); - this.xhr.addEventListener('readystatechange', this._checkStreamClosed.bind(this)); - this.xhr.addEventListener('error', this._onStreamFailure.bind(this)); - this.xhr.addEventListener('abort', this._onStreamAbort.bind(this)); - this.xhr.open(this.method, this.url); - for (var header in this.headers) { - this.xhr.setRequestHeader(header, this.headers[header]); - } - this.xhr.withCredentials = this.withCredentials; - this.xhr.send(this.payload); - }; - - this.close = function () { - if (this.readyState === this.CLOSED) { - return; - } - - this.xhr.abort(); - this.xhr = null; - this._setReadyState(this.CLOSED); - }; -}; - -export { SSE }; -// Export our SSE module for npm.js -// if (typeof exports !== 'undefined') { -// // exports.SSE = SSE; -// module.exports = { SSE }; -// }