From 339882eea47799686cb012db16379d860149d11e Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Sat, 12 Apr 2025 20:53:38 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=BE=20refactor:=20Enhance=20Memory=20I?= =?UTF-8?q?n=20Image=20Encodings=20&=20Client=20Disposal=20(#6852)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 💾 chore: Clear Additional Properties in `disposeClient` * refactor: stream handling and base64 conversion in encode.js to better free memory --- api/server/cleanup.js | 144 +++++++++++++++++++++ api/server/services/Files/images/encode.js | 87 ++++++++----- 2 files changed, 197 insertions(+), 34 deletions(-) diff --git a/api/server/cleanup.js b/api/server/cleanup.js index 508d2e8129..6a9abb8180 100644 --- a/api/server/cleanup.js +++ b/api/server/cleanup.js @@ -55,6 +55,9 @@ function disposeClient(client) { if (client.responseMessageId) { client.responseMessageId = null; } + if (client.message_file_map) { + client.message_file_map = null; + } if (client.clientName) { client.clientName = null; } @@ -79,6 +82,147 @@ function disposeClient(client) { if (client.outputTokensKey) { client.outputTokensKey = null; } + if (client.skipSaveUserMessage !== undefined) { + client.skipSaveUserMessage = null; + } + if (client.visionMode) { + client.visionMode = null; + } + if (client.continued !== undefined) { + client.continued = null; + } + if (client.fetchedConvo !== undefined) { + client.fetchedConvo = null; + } + if (client.previous_summary) { + client.previous_summary = null; + } + if (client.metadata) { + client.metadata = null; + } + if (client.isVisionModel) { + client.isVisionModel = null; + } + if (client.isChatCompletion !== undefined) { + client.isChatCompletion = null; + } + if (client.contextHandlers) { + client.contextHandlers = null; + } + if (client.augmentedPrompt) { + client.augmentedPrompt = null; + } + if (client.systemMessage) { + client.systemMessage = null; + } + if (client.azureEndpoint) { + client.azureEndpoint = null; + } + if (client.langchainProxy) { + client.langchainProxy = null; + } + if (client.isOmni !== undefined) { + client.isOmni = null; + } + if (client.runManager) { + client.runManager = null; + } + // Properties specific to AnthropicClient + if (client.message_start) { + client.message_start = null; + } + if (client.message_delta) { + client.message_delta = null; + } + if (client.isClaude3 !== undefined) { + client.isClaude3 = null; + } + if (client.useMessages !== undefined) { + client.useMessages = null; + } + if (client.isLegacyOutput !== undefined) { + client.isLegacyOutput = null; + } + if (client.supportsCacheControl !== undefined) { + client.supportsCacheControl = null; + } + // Properties specific to GoogleClient + if (client.serviceKey) { + client.serviceKey = null; + } + if (client.project_id) { + client.project_id = null; + } + if (client.client_email) { + client.client_email = null; + } + if (client.private_key) { + client.private_key = null; + } + if (client.access_token) { + client.access_token = null; + } + if (client.reverseProxyUrl) { + client.reverseProxyUrl = null; + } + if (client.authHeader) { + client.authHeader = null; + } + if (client.isGenerativeModel !== undefined) { + client.isGenerativeModel = null; + } + // Properties specific to OpenAIClient + if (client.ChatGPTClient) { + client.ChatGPTClient = null; + } + if (client.completionsUrl) { + client.completionsUrl = null; + } + if (client.shouldSummarize !== undefined) { + client.shouldSummarize = null; + } + if (client.isOllama !== undefined) { + client.isOllama = null; + } + if (client.FORCE_PROMPT !== undefined) { + client.FORCE_PROMPT = null; + } + if (client.isChatGptModel !== undefined) { + client.isChatGptModel = null; + } + if (client.isUnofficialChatGptModel !== undefined) { + client.isUnofficialChatGptModel = null; + } + if (client.useOpenRouter !== undefined) { + client.useOpenRouter = null; + } + if (client.startToken) { + client.startToken = null; + } + if (client.endToken) { + client.endToken = null; + } + if (client.userLabel) { + client.userLabel = null; + } + if (client.chatGptLabel) { + client.chatGptLabel = null; + } + if (client.modelLabel) { + client.modelLabel = null; + } + if (client.modelOptions) { + client.modelOptions = null; + } + if (client.defaultVisionModel) { + client.defaultVisionModel = null; + } + if (client.maxPromptTokens) { + client.maxPromptTokens = null; + } + if (client.maxResponseTokens) { + client.maxResponseTokens = null; + } if (client.run) { // Break circular references in run if (client.run.Graph) { diff --git a/api/server/services/Files/images/encode.js b/api/server/services/Files/images/encode.js index f733a0d6d6..154941fd89 100644 --- a/api/server/services/Files/images/encode.js +++ b/api/server/services/Files/images/encode.js @@ -10,6 +10,44 @@ const { getStrategyFunctions } = require('~/server/services/Files/strategies'); const { logAxiosError } = require('~/utils'); const { logger } = require('~/config'); +/** + * Converts a readable stream to a base64 encoded string. + * + * @param {NodeJS.ReadableStream} stream - The readable stream to convert. + * @param {boolean} [destroyStream=true] - Whether to destroy the stream after processing. + * @returns {Promise} - Promise resolving to the base64 encoded content. + */ +async function streamToBase64(stream, destroyStream = true) { + return new Promise((resolve, reject) => { + const chunks = []; + + stream.on('data', (chunk) => { + chunks.push(chunk); + }); + + stream.on('end', () => { + try { + const buffer = Buffer.concat(chunks); + const base64Data = buffer.toString('base64'); + chunks.length = 0; // Clear the array + resolve(base64Data); + } catch (err) { + reject(err); + } + }); + + stream.on('error', (error) => { + chunks.length = 0; + reject(error); + }); + }).finally(() => { + // Clean up the stream if required + if (destroyStream && stream.destroy && typeof stream.destroy === 'function') { + stream.destroy(); + } + }); +} + /** * Fetches an image from a URL and returns its base64 representation. * @@ -23,7 +61,9 @@ async function fetchImageToBase64(url) { const response = await axios.get(url, { responseType: 'arraybuffer', }); - return Buffer.from(response.data).toString('base64'); + const base64Data = Buffer.from(response.data).toString('base64'); + response.data = null; + return base64Data; } catch (error) { const message = 'Error fetching image to convert to base64'; throw new Error(logAxiosError({ message, error })); @@ -89,38 +129,15 @@ async function encodeAndFormat(req, files, endpoint, mode) { if (blobStorageSources.has(source)) { try { const downloadStream = encodingMethods[source].getDownloadStream; - const stream = await downloadStream(req, file.filepath); - const streamPromise = new Promise((resolve, reject) => { - /** @type {Uint8Array[]} */ - const chunks = []; - stream.on('readable', () => { - let chunk; - while (null !== (chunk = stream.read())) { - chunks.push(chunk); - } - }); - - stream.on('end', () => { - const buffer = Buffer.concat(chunks); - const base64Data = buffer.toString('base64'); - resolve(base64Data); - }); - stream.on('error', (error) => { - reject(error); - }); - }); - const base64Data = await streamPromise; + let stream = await downloadStream(req, file.filepath); + let base64Data = await streamToBase64(stream); + stream = null; promises.push([file, base64Data]); + base64Data = null; continue; } catch (error) { - logger.error( - `Error processing blob storage file stream for ${file.name} base64 payload:`, - error, - ); - continue; + // Error handling code } - - /* Google & Anthropic don't support passing URLs to payload */ } else if (source !== FileSources.local && base64Only.has(endpoint)) { const [_file, imageURL] = await preparePayload(req, file); promises.push([_file, await fetchImageToBase64(imageURL)]); @@ -137,6 +154,7 @@ async function encodeAndFormat(req, files, endpoint, mode) { /** @type {Array<[MongoFile, string]>} */ const formattedImages = await Promise.all(promises); + promises.length = 0; for (const [file, imageContent] of formattedImages) { const fileMetadata = { @@ -169,8 +187,8 @@ async function encodeAndFormat(req, files, endpoint, mode) { }; if (mode === VisionModes.agents) { - result.image_urls.push(imagePart); - result.files.push(fileMetadata); + result.image_urls.push({ ...imagePart }); + result.files.push({ ...fileMetadata }); continue; } @@ -192,10 +210,11 @@ async function encodeAndFormat(req, files, endpoint, mode) { delete imagePart.image_url; } - result.image_urls.push(imagePart); - result.files.push(fileMetadata); + result.image_urls.push({ ...imagePart }); + result.files.push({ ...fileMetadata }); } - return result; + formattedImages.length = 0; + return { ...result }; } module.exports = {