🏄‍♂️ refactor: Optimize Reasoning UI & Token Streaming (#5546)

*  feat: Implement Show Thinking feature; refactor: testing thinking render optimizations

*  feat: Refactor Thinking component styles and enhance Markdown rendering

* chore: add back removed code, revert type changes

* chore: Add back resetCounter effect to Markdown component for improved code block indexing

* chore: bump @librechat/agents and google langchain packages

* WIP: reasoning type updates

* WIP: first pass, reasoning content blocks

* chore: revert code

* chore: bump @librechat/agents

* refactor: optimize reasoning tag handling

* style: ul indent padding

* feat: add Reasoning component to handle reasoning display

* feat: first pass, content reasoning part styling

* refactor: add content placeholder for endpoints using new stream handler

* refactor: only cache messages when requesting stream audio

* fix: circular dep.

* fix: add default param

* refactor: tts, only request after message stream, fix chrome autoplay

* style: update label for submitting state and add localization for 'Thinking...'

* fix: improve global audio pause logic and reset active run ID

* fix: handle artifact edge cases

* fix: remove unnecessary console log from artifact update test

* feat: add support for continued message handling with new streaming method

---------

Co-authored-by: Marco Beretta <81851188+berry-13@users.noreply.github.com>
This commit is contained in:
Danny Avila 2025-01-29 19:46:58 -05:00 committed by GitHub
parent d60a149ad9
commit 591a019766
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
48 changed files with 1791 additions and 726 deletions

View file

@ -1,8 +1,6 @@
const throttle = require('lodash/throttle');
const { getResponseSender, Constants, CacheKeys, Time } = require('librechat-data-provider');
const { getResponseSender, Constants } = require('librechat-data-provider');
const { createAbortController, handleAbortError } = require('~/server/middleware');
const { sendMessage, createOnProgress } = require('~/server/utils');
const { getLogStores } = require('~/cache');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');
@ -57,33 +55,9 @@ const AskController = async (req, res, next, initializeClient, addTitle) => {
try {
const { client } = await initializeClient({ req, res, endpointOption });
const messageCache = getLogStores(CacheKeys.MESSAGES);
const { onProgress: progressCallback, getPartialText } = createOnProgress({
onProgress: throttle(
({ text: partialText }) => {
/*
const unfinished = endpointOption.endpoint === EModelEndpoint.google ? false : true;
messageCache.set(responseMessageId, {
messageId: responseMessageId,
sender,
conversationId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: partialText,
model: client.modelOptions.model,
unfinished,
error: false,
user,
}, Time.FIVE_MINUTES);
*/
const { onProgress: progressCallback, getPartialText } = createOnProgress();
messageCache.set(responseMessageId, partialText, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
),
});
getText = getPartialText;
getText = client.getStreamText != null ? client.getStreamText.bind(client) : getPartialText;
const getAbortData = () => ({
sender,
@ -91,7 +65,7 @@ const AskController = async (req, res, next, initializeClient, addTitle) => {
userMessagePromise,
messageId: responseMessageId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: getPartialText(),
text: getText(),
userMessage,
promptTokens,
});

View file

@ -1,8 +1,6 @@
const throttle = require('lodash/throttle');
const { getResponseSender, CacheKeys, Time } = require('librechat-data-provider');
const { getResponseSender } = require('librechat-data-provider');
const { createAbortController, handleAbortError } = require('~/server/middleware');
const { sendMessage, createOnProgress } = require('~/server/utils');
const { getLogStores } = require('~/cache');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');
@ -53,62 +51,44 @@ const EditController = async (req, res, next, initializeClient) => {
}
};
const messageCache = getLogStores(CacheKeys.MESSAGES);
const { onProgress: progressCallback, getPartialText } = createOnProgress({
generation,
onProgress: throttle(
({ text: partialText }) => {
/*
const unfinished = endpointOption.endpoint === EModelEndpoint.google ? false : true;
{
messageId: responseMessageId,
sender,
conversationId,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: partialText,
model: endpointOption.modelOptions.model,
unfinished,
isEdited: true,
error: false,
user,
} */
messageCache.set(responseMessageId, partialText, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
),
});
const getAbortData = () => ({
conversationId,
userMessagePromise,
messageId: responseMessageId,
sender,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: getPartialText(),
userMessage,
promptTokens,
});
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
res.on('close', () => {
logger.debug('[EditController] Request closed');
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[EditController] Request aborted on close');
});
let getText;
try {
const { client } = await initializeClient({ req, res, endpointOption });
getText = client.getStreamText != null ? client.getStreamText.bind(client) : getPartialText;
const getAbortData = () => ({
conversationId,
userMessagePromise,
messageId: responseMessageId,
sender,
parentMessageId: overrideParentMessageId ?? userMessageId,
text: getText(),
userMessage,
promptTokens,
});
const { abortController, onStart } = createAbortController(req, res, getAbortData, getReqData);
res.on('close', () => {
logger.debug('[EditController] Request closed');
if (!abortController) {
return;
} else if (abortController.signal.aborted) {
return;
} else if (abortController.requestCompleted) {
return;
}
abortController.abort();
logger.debug('[EditController] Request aborted on close');
});
let response = await client.sendMessage(text, {
user,
generation,
@ -153,7 +133,7 @@ const EditController = async (req, res, next, initializeClient) => {
);
}
} catch (error) {
const partialText = getPartialText();
const partialText = getText();
handleAbortError(res, req, error, {
partialText,
conversationId,

View file

@ -10,7 +10,7 @@ const {
const { processCodeOutput } = require('~/server/services/Files/Code/process');
const { saveBase64Image } = require('~/server/services/Files/process');
const { loadAuthValues } = require('~/app/clients/tools/util');
const { logger } = require('~/config');
const { logger, sendEvent } = require('~/config');
/** @typedef {import('@librechat/agents').Graph} Graph */
/** @typedef {import('@librechat/agents').EventHandler} EventHandler */
@ -21,20 +21,6 @@ const { logger } = require('~/config');
/** @typedef {import('@librechat/agents').ContentAggregatorResult['aggregateContent']} ContentAggregator */
/** @typedef {import('@librechat/agents').GraphEvents} GraphEvents */
/**
* Sends message data in Server Sent Events format.
* @param {ServerResponse} res - The server response.
* @param {{ data: string | Record<string, unknown>, event?: string }} event - The message event.
* @param {string} event.event - The type of event.
* @param {string} event.data - The message to be sent.
*/
const sendEvent = (res, event) => {
if (typeof event.data === 'string' && event.data.length === 0) {
return;
}
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
};
class ModelEndHandler {
/**
* @param {Array<UsageMetadata>} collectedUsage
@ -322,7 +308,6 @@ function createToolEndCallback({ req, res, artifactPromises }) {
}
module.exports = {
sendEvent,
getDefaultHandlers,
createToolEndCallback,
};

View file

@ -397,18 +397,6 @@ const chatV2 = async (req, res) => {
response = streamRunManager;
response.text = streamRunManager.intermediateText;
if (response.text) {
const messageCache = getLogStores(CacheKeys.MESSAGES);
messageCache.set(
responseMessageId,
{
complete: true,
text: response.text,
},
Time.FIVE_MINUTES,
);
}
};
await processRun();

View file

@ -1,11 +1,9 @@
const express = require('express');
const throttle = require('lodash/throttle');
const { getResponseSender, Constants, CacheKeys, Time } = require('librechat-data-provider');
const { getResponseSender, Constants } = require('librechat-data-provider');
const { initializeClient } = require('~/server/services/Endpoints/gptPlugins');
const { sendMessage, createOnProgress } = require('~/server/utils');
const { addTitle } = require('~/server/services/Endpoints/openAI');
const { saveMessage, updateMessage } = require('~/models');
const { getLogStores } = require('~/cache');
const {
handleAbort,
createAbortController,
@ -72,15 +70,6 @@ router.post(
}
};
const messageCache = getLogStores(CacheKeys.MESSAGES);
const throttledCacheSet = throttle(
(text) => {
messageCache.set(responseMessageId, text, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
);
let streaming = null;
let timer = null;
@ -89,13 +78,11 @@ router.post(
sendIntermediateMessage,
getPartialText,
} = createOnProgress({
onProgress: ({ text: partialText }) => {
onProgress: () => {
if (timer) {
clearTimeout(timer);
}
throttledCacheSet(partialText);
streaming = new Promise((resolve) => {
timer = setTimeout(() => {
resolve();

View file

@ -1,6 +1,5 @@
const express = require('express');
const throttle = require('lodash/throttle');
const { getResponseSender, CacheKeys, Time } = require('librechat-data-provider');
const { getResponseSender } = require('librechat-data-provider');
const {
setHeaders,
handleAbort,
@ -14,7 +13,6 @@ const {
const { sendMessage, createOnProgress, formatSteps, formatAction } = require('~/server/utils');
const { initializeClient } = require('~/server/services/Endpoints/gptPlugins');
const { saveMessage, updateMessage } = require('~/models');
const { getLogStores } = require('~/cache');
const { validateTools } = require('~/app');
const { logger } = require('~/config');
@ -80,26 +78,16 @@ router.post(
}
};
const messageCache = getLogStores(CacheKeys.MESSAGES);
const throttledCacheSet = throttle(
(text) => {
messageCache.set(responseMessageId, text, Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
);
const {
onProgress: progressCallback,
sendIntermediateMessage,
getPartialText,
} = createOnProgress({
generation,
onProgress: ({ text: partialText }) => {
onProgress: () => {
if (plugin.loading === true) {
plugin.loading = false;
}
throttledCacheSet(partialText);
},
});

View file

@ -21,7 +21,7 @@ router.post('/artifact/:messageId', async (req, res) => {
const { messageId } = req.params;
const { index, original, updated } = req.body;
if (typeof index !== 'number' || index < 0 || !original || !updated) {
if (typeof index !== 'number' || index < 0 || original == null || updated == null) {
return res.status(400).json({ error: 'Invalid request parameters' });
}

View file

@ -57,14 +57,42 @@ const findAllArtifacts = (message) => {
const replaceArtifactContent = (originalText, artifact, original, updated) => {
const artifactContent = artifact.text.substring(artifact.start, artifact.end);
const relativeIndex = artifactContent.indexOf(original);
// Find boundaries between ARTIFACT_START and ARTIFACT_END
const contentStart = artifactContent.indexOf('\n', artifactContent.indexOf(ARTIFACT_START)) + 1;
const contentEnd = artifactContent.lastIndexOf(ARTIFACT_END);
if (contentStart === -1 || contentEnd === -1) {
return null;
}
// Check if there are code blocks
const codeBlockStart = artifactContent.indexOf('```\n', contentStart);
const codeBlockEnd = artifactContent.lastIndexOf('\n```', contentEnd);
// Determine where to look for the original content
let searchStart, searchEnd;
if (codeBlockStart !== -1 && codeBlockEnd !== -1) {
// If code blocks exist, search between them
searchStart = codeBlockStart + 4; // after ```\n
searchEnd = codeBlockEnd;
} else {
// Otherwise search in the whole artifact content
searchStart = contentStart;
searchEnd = contentEnd;
}
const innerContent = artifactContent.substring(searchStart, searchEnd);
// Remove trailing newline from original for comparison
const originalTrimmed = original.replace(/\n$/, '');
const relativeIndex = innerContent.indexOf(originalTrimmed);
if (relativeIndex === -1) {
return null;
}
const absoluteIndex = artifact.start + relativeIndex;
const endText = originalText.substring(absoluteIndex + original.length);
const absoluteIndex = artifact.start + searchStart + relativeIndex;
const endText = originalText.substring(absoluteIndex + originalTrimmed.length);
const hasTrailingNewline = endText.startsWith('\n');
const updatedText =

View file

@ -260,8 +260,61 @@ console.log(greeting);`;
codeExample,
'updated content',
);
console.log(result);
expect(result).toMatch(/id="2".*updated content/s);
expect(result).toMatch(new RegExp(`${ARTIFACT_START}.*updated content.*${ARTIFACT_END}`, 's'));
});
test('should handle empty content in artifact without code blocks', () => {
const artifactText = `${ARTIFACT_START}\n\n${ARTIFACT_END}`;
const artifact = {
start: 0,
end: artifactText.length,
text: artifactText,
source: 'text',
};
const result = replaceArtifactContent(artifactText, artifact, '', 'new content');
expect(result).toBe(`${ARTIFACT_START}\nnew content\n${ARTIFACT_END}`);
});
test('should handle empty content in artifact with code blocks', () => {
const artifactText = createArtifactText({ content: '' });
const artifact = {
start: 0,
end: artifactText.length,
text: artifactText,
source: 'text',
};
const result = replaceArtifactContent(artifactText, artifact, '', 'new content');
expect(result).toMatch(/```\nnew content\n```/);
});
test('should handle content with trailing newline in code blocks', () => {
const contentWithNewline = 'console.log("test")\n';
const message = {
text: `Some prefix text\n${createArtifactText({
content: contentWithNewline,
})}\nSome suffix text`,
};
const artifacts = findAllArtifacts(message);
expect(artifacts).toHaveLength(1);
const result = replaceArtifactContent(
message.text,
artifacts[0],
contentWithNewline,
'updated content',
);
// Should update the content and preserve artifact structure
expect(result).toContain('```\nupdated content\n```');
// Should preserve surrounding text
expect(result).toMatch(/^Some prefix text\n/);
expect(result).toMatch(/\nSome suffix text$/);
// Should not have extra newlines
expect(result).not.toContain('\n\n```');
expect(result).not.toContain('```\n\n');
});
});

View file

@ -364,7 +364,7 @@ class TTSService {
shouldContinue = false;
});
const processChunks = createChunkProcessor(req.body.messageId);
const processChunks = createChunkProcessor(req.user.id, req.body.messageId);
try {
while (shouldContinue) {

View file

@ -1,4 +1,5 @@
const { CacheKeys, findLastSeparatorIndex, SEPARATORS } = require('librechat-data-provider');
const { CacheKeys, findLastSeparatorIndex, SEPARATORS, Time } = require('librechat-data-provider');
const { getMessage } = require('~/models/Message');
const { getLogStores } = require('~/cache');
/**
@ -47,10 +48,11 @@ const MAX_NOT_FOUND_COUNT = 6;
const MAX_NO_CHANGE_COUNT = 10;
/**
* @param {string} user
* @param {string} messageId
* @returns {() => Promise<{ text: string, isFinished: boolean }[]>}
*/
function createChunkProcessor(messageId) {
function createChunkProcessor(user, messageId) {
let notFoundCount = 0;
let noChangeCount = 0;
let processedText = '';
@ -73,15 +75,27 @@ function createChunkProcessor(messageId) {
}
/** @type { string | { text: string; complete: boolean } } */
const message = await messageCache.get(messageId);
let message = await messageCache.get(messageId);
if (!message) {
message = await getMessage({ user, messageId });
}
if (!message) {
notFoundCount++;
return [];
} else {
messageCache.set(
messageId,
{
text: message.text,
complete: true,
},
Time.FIVE_MINUTES,
);
}
const text = typeof message === 'string' ? message : message.text;
const complete = typeof message === 'string' ? false : message.complete;
const complete = typeof message === 'string' ? false : message.complete ?? true;
if (text === processedText) {
noChangeCount++;

View file

@ -3,6 +3,13 @@ const { createChunkProcessor, splitTextIntoChunks } = require('./streamAudio');
jest.mock('keyv');
const globalCache = {};
jest.mock('~/models/Message', () => {
return {
getMessage: jest.fn().mockImplementation((messageId) => {
return globalCache[messageId] || null;
}),
};
});
jest.mock('~/cache/getLogStores', () => {
return jest.fn().mockImplementation(() => {
const EventEmitter = require('events');
@ -56,9 +63,10 @@ describe('processChunks', () => {
jest.resetAllMocks();
mockMessageCache = {
get: jest.fn(),
set: jest.fn(),
};
require('~/cache/getLogStores').mockReturnValue(mockMessageCache);
processChunks = createChunkProcessor('message-id');
processChunks = createChunkProcessor('userId', 'message-id');
});
it('should return an empty array when the message is not found', async () => {

View file

@ -1,19 +1,15 @@
const throttle = require('lodash/throttle');
const {
Time,
CacheKeys,
Constants,
StepTypes,
ContentTypes,
ToolCallTypes,
MessageContentTypes,
AssistantStreamEvents,
Constants,
} = require('librechat-data-provider');
const { retrieveAndProcessFile } = require('~/server/services/Files/process');
const { processRequiredActions } = require('~/server/services/ToolService');
const { createOnProgress, sendMessage, sleep } = require('~/server/utils');
const { processMessages } = require('~/server/services/Threads');
const { getLogStores } = require('~/cache');
const { logger } = require('~/config');
/**
@ -611,20 +607,8 @@ class StreamRunManager {
const index = this.getStepIndex(stepKey);
this.orderedRunSteps.set(index, message_creation);
const messageCache = getLogStores(CacheKeys.MESSAGES);
// Create the Factory Function to stream the message
const { onProgress: progressCallback } = createOnProgress({
onProgress: throttle(
() => {
messageCache.set(this.finalMessage.messageId, this.getText(), Time.FIVE_MINUTES);
},
3000,
{ trailing: false },
),
});
const { onProgress: progressCallback } = createOnProgress();
// This creates a function that attaches all of the parameters
// specified here to each SSE message generated by the TextStream
const onProgress = progressCallback({
index,
res: this.res,

View file

@ -18,7 +18,12 @@ const citationRegex = /\[\^\d+?\^]/g;
const addSpaceIfNeeded = (text) => (text.length > 0 && !text.endsWith(' ') ? text + ' ' : text);
const base = { message: true, initial: true };
const createOnProgress = ({ generation = '', onProgress: _onProgress }) => {
const createOnProgress = (
{ generation = '', onProgress: _onProgress } = {
generation: '',
onProgress: null,
},
) => {
let i = 0;
let tokens = addSpaceIfNeeded(generation);