mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-20 18:30:15 +01:00
refactor: Integrate streamId handling for improved resumable functionality for attachments
- Added streamId parameter to various functions to support resumable mode in tool loading and memory processing. - Updated related methods to ensure proper handling of attachments and responses based on the presence of streamId, enhancing the overall streaming experience. - Improved logging and attachment management to accommodate both standard and resumable modes.
This commit is contained in:
parent
98f2025f6f
commit
9fc225a347
6 changed files with 55 additions and 19 deletions
|
|
@ -17,6 +17,7 @@ import type { TAttachment, MemoryArtifact } from 'librechat-data-provider';
|
|||
import type { ObjectId, MemoryMethods } from '@librechat/data-schemas';
|
||||
import type { BaseMessage, ToolMessage } from '@langchain/core/messages';
|
||||
import type { Response as ServerResponse } from 'express';
|
||||
import { GenerationJobManager } from '~/stream/GenerationJobManager';
|
||||
import { Tokenizer } from '~/utils';
|
||||
|
||||
type RequiredMemoryMethods = Pick<
|
||||
|
|
@ -283,6 +284,7 @@ export async function processMemory({
|
|||
llmConfig,
|
||||
tokenLimit,
|
||||
totalTokens = 0,
|
||||
streamId = null,
|
||||
}: {
|
||||
res: ServerResponse;
|
||||
setMemory: MemoryMethods['setMemory'];
|
||||
|
|
@ -297,6 +299,7 @@ export async function processMemory({
|
|||
tokenLimit?: number;
|
||||
totalTokens?: number;
|
||||
llmConfig?: Partial<LLMConfig>;
|
||||
streamId?: string | null;
|
||||
}): Promise<(TAttachment | null)[] | undefined> {
|
||||
try {
|
||||
const memoryTool = createMemoryTool({
|
||||
|
|
@ -364,7 +367,7 @@ ${memory ?? 'No existing memories'}`;
|
|||
}
|
||||
|
||||
const artifactPromises: Promise<TAttachment | null>[] = [];
|
||||
const memoryCallback = createMemoryCallback({ res, artifactPromises });
|
||||
const memoryCallback = createMemoryCallback({ res, artifactPromises, streamId });
|
||||
const customHandlers = {
|
||||
[GraphEvents.TOOL_END]: new BasicToolEndHandler(memoryCallback),
|
||||
};
|
||||
|
|
@ -417,6 +420,7 @@ export async function createMemoryProcessor({
|
|||
memoryMethods,
|
||||
conversationId,
|
||||
config = {},
|
||||
streamId = null,
|
||||
}: {
|
||||
res: ServerResponse;
|
||||
messageId: string;
|
||||
|
|
@ -424,6 +428,7 @@ export async function createMemoryProcessor({
|
|||
userId: string | ObjectId;
|
||||
memoryMethods: RequiredMemoryMethods;
|
||||
config?: MemoryConfig;
|
||||
streamId?: string | null;
|
||||
}): Promise<[string, (messages: BaseMessage[]) => Promise<(TAttachment | null)[] | undefined>]> {
|
||||
const { validKeys, instructions, llmConfig, tokenLimit } = config;
|
||||
const finalInstructions = instructions || getDefaultInstructions(validKeys, tokenLimit);
|
||||
|
|
@ -444,6 +449,7 @@ export async function createMemoryProcessor({
|
|||
llmConfig,
|
||||
messageId,
|
||||
tokenLimit,
|
||||
streamId,
|
||||
conversationId,
|
||||
memory: withKeys,
|
||||
totalTokens: totalTokens || 0,
|
||||
|
|
@ -462,10 +468,12 @@ async function handleMemoryArtifact({
|
|||
res,
|
||||
data,
|
||||
metadata,
|
||||
streamId = null,
|
||||
}: {
|
||||
res: ServerResponse;
|
||||
data: ToolEndData;
|
||||
metadata?: ToolEndMetadata;
|
||||
streamId?: string | null;
|
||||
}) {
|
||||
const output = data?.output as ToolMessage | undefined;
|
||||
if (!output) {
|
||||
|
|
@ -491,7 +499,11 @@ async function handleMemoryArtifact({
|
|||
if (!res.headersSent) {
|
||||
return attachment;
|
||||
}
|
||||
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
|
||||
if (streamId) {
|
||||
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
|
||||
} else {
|
||||
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
|
||||
}
|
||||
return attachment;
|
||||
}
|
||||
|
||||
|
|
@ -500,14 +512,17 @@ async function handleMemoryArtifact({
|
|||
* @param params - The parameters object
|
||||
* @param params.res - The server response object
|
||||
* @param params.artifactPromises - Array to collect artifact promises
|
||||
* @param params.streamId - The stream ID for resumable mode, or null for standard mode
|
||||
* @returns The memory callback function
|
||||
*/
|
||||
export function createMemoryCallback({
|
||||
res,
|
||||
artifactPromises,
|
||||
streamId = null,
|
||||
}: {
|
||||
res: ServerResponse;
|
||||
artifactPromises: Promise<Partial<TAttachment> | null>[];
|
||||
streamId?: string | null;
|
||||
}): ToolEndCallback {
|
||||
return async (data: ToolEndData, metadata?: Record<string, unknown>) => {
|
||||
const output = data?.output as ToolMessage | undefined;
|
||||
|
|
@ -516,7 +531,7 @@ export function createMemoryCallback({
|
|||
return;
|
||||
}
|
||||
artifactPromises.push(
|
||||
handleMemoryArtifact({ res, data, metadata }).catch((error) => {
|
||||
handleMemoryArtifact({ res, data, metadata, streamId }).catch((error) => {
|
||||
logger.error('Error processing memory artifact content:', error);
|
||||
return null;
|
||||
}),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue