💾 refactor: Enhance Memory In Image Encodings & Client Disposal (#6852)

* 💾 chore: Clear Additional Properties in `disposeClient`

* refactor: stream handling and base64 conversion in encode.js to better free memory
This commit is contained in:
Danny Avila 2025-04-12 20:53:38 -04:00 committed by GitHub
parent 37964975c1
commit 339882eea4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 197 additions and 34 deletions

View file

@ -55,6 +55,9 @@ function disposeClient(client) {
if (client.responseMessageId) {
client.responseMessageId = null;
}
if (client.message_file_map) {
client.message_file_map = null;
}
if (client.clientName) {
client.clientName = null;
}
@ -79,6 +82,147 @@ function disposeClient(client) {
if (client.outputTokensKey) {
client.outputTokensKey = null;
}
if (client.skipSaveUserMessage !== undefined) {
client.skipSaveUserMessage = null;
}
if (client.visionMode) {
client.visionMode = null;
}
if (client.continued !== undefined) {
client.continued = null;
}
if (client.fetchedConvo !== undefined) {
client.fetchedConvo = null;
}
if (client.previous_summary) {
client.previous_summary = null;
}
if (client.metadata) {
client.metadata = null;
}
if (client.isVisionModel) {
client.isVisionModel = null;
}
if (client.isChatCompletion !== undefined) {
client.isChatCompletion = null;
}
if (client.contextHandlers) {
client.contextHandlers = null;
}
if (client.augmentedPrompt) {
client.augmentedPrompt = null;
}
if (client.systemMessage) {
client.systemMessage = null;
}
if (client.azureEndpoint) {
client.azureEndpoint = null;
}
if (client.langchainProxy) {
client.langchainProxy = null;
}
if (client.isOmni !== undefined) {
client.isOmni = null;
}
if (client.runManager) {
client.runManager = null;
}
// Properties specific to AnthropicClient
if (client.message_start) {
client.message_start = null;
}
if (client.message_delta) {
client.message_delta = null;
}
if (client.isClaude3 !== undefined) {
client.isClaude3 = null;
}
if (client.useMessages !== undefined) {
client.useMessages = null;
}
if (client.isLegacyOutput !== undefined) {
client.isLegacyOutput = null;
}
if (client.supportsCacheControl !== undefined) {
client.supportsCacheControl = null;
}
// Properties specific to GoogleClient
if (client.serviceKey) {
client.serviceKey = null;
}
if (client.project_id) {
client.project_id = null;
}
if (client.client_email) {
client.client_email = null;
}
if (client.private_key) {
client.private_key = null;
}
if (client.access_token) {
client.access_token = null;
}
if (client.reverseProxyUrl) {
client.reverseProxyUrl = null;
}
if (client.authHeader) {
client.authHeader = null;
}
if (client.isGenerativeModel !== undefined) {
client.isGenerativeModel = null;
}
// Properties specific to OpenAIClient
if (client.ChatGPTClient) {
client.ChatGPTClient = null;
}
if (client.completionsUrl) {
client.completionsUrl = null;
}
if (client.shouldSummarize !== undefined) {
client.shouldSummarize = null;
}
if (client.isOllama !== undefined) {
client.isOllama = null;
}
if (client.FORCE_PROMPT !== undefined) {
client.FORCE_PROMPT = null;
}
if (client.isChatGptModel !== undefined) {
client.isChatGptModel = null;
}
if (client.isUnofficialChatGptModel !== undefined) {
client.isUnofficialChatGptModel = null;
}
if (client.useOpenRouter !== undefined) {
client.useOpenRouter = null;
}
if (client.startToken) {
client.startToken = null;
}
if (client.endToken) {
client.endToken = null;
}
if (client.userLabel) {
client.userLabel = null;
}
if (client.chatGptLabel) {
client.chatGptLabel = null;
}
if (client.modelLabel) {
client.modelLabel = null;
}
if (client.modelOptions) {
client.modelOptions = null;
}
if (client.defaultVisionModel) {
client.defaultVisionModel = null;
}
if (client.maxPromptTokens) {
client.maxPromptTokens = null;
}
if (client.maxResponseTokens) {
client.maxResponseTokens = null;
}
if (client.run) {
// Break circular references in run
if (client.run.Graph) {

View file

@ -10,6 +10,44 @@ const { getStrategyFunctions } = require('~/server/services/Files/strategies');
const { logAxiosError } = require('~/utils');
const { logger } = require('~/config');
/**
* Converts a readable stream to a base64 encoded string.
*
* @param {NodeJS.ReadableStream} stream - The readable stream to convert.
* @param {boolean} [destroyStream=true] - Whether to destroy the stream after processing.
* @returns {Promise<string>} - Promise resolving to the base64 encoded content.
*/
async function streamToBase64(stream, destroyStream = true) {
return new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.on('end', () => {
try {
const buffer = Buffer.concat(chunks);
const base64Data = buffer.toString('base64');
chunks.length = 0; // Clear the array
resolve(base64Data);
} catch (err) {
reject(err);
}
});
stream.on('error', (error) => {
chunks.length = 0;
reject(error);
});
}).finally(() => {
// Clean up the stream if required
if (destroyStream && stream.destroy && typeof stream.destroy === 'function') {
stream.destroy();
}
});
}
/**
* Fetches an image from a URL and returns its base64 representation.
*
@ -23,7 +61,9 @@ async function fetchImageToBase64(url) {
const response = await axios.get(url, {
responseType: 'arraybuffer',
});
return Buffer.from(response.data).toString('base64');
const base64Data = Buffer.from(response.data).toString('base64');
response.data = null;
return base64Data;
} catch (error) {
const message = 'Error fetching image to convert to base64';
throw new Error(logAxiosError({ message, error }));
@ -89,38 +129,15 @@ async function encodeAndFormat(req, files, endpoint, mode) {
if (blobStorageSources.has(source)) {
try {
const downloadStream = encodingMethods[source].getDownloadStream;
const stream = await downloadStream(req, file.filepath);
const streamPromise = new Promise((resolve, reject) => {
/** @type {Uint8Array[]} */
const chunks = [];
stream.on('readable', () => {
let chunk;
while (null !== (chunk = stream.read())) {
chunks.push(chunk);
}
});
stream.on('end', () => {
const buffer = Buffer.concat(chunks);
const base64Data = buffer.toString('base64');
resolve(base64Data);
});
stream.on('error', (error) => {
reject(error);
});
});
const base64Data = await streamPromise;
let stream = await downloadStream(req, file.filepath);
let base64Data = await streamToBase64(stream);
stream = null;
promises.push([file, base64Data]);
base64Data = null;
continue;
} catch (error) {
logger.error(
`Error processing blob storage file stream for ${file.name} base64 payload:`,
error,
);
continue;
// Error handling code
}
/* Google & Anthropic don't support passing URLs to payload */
} else if (source !== FileSources.local && base64Only.has(endpoint)) {
const [_file, imageURL] = await preparePayload(req, file);
promises.push([_file, await fetchImageToBase64(imageURL)]);
@ -137,6 +154,7 @@ async function encodeAndFormat(req, files, endpoint, mode) {
/** @type {Array<[MongoFile, string]>} */
const formattedImages = await Promise.all(promises);
promises.length = 0;
for (const [file, imageContent] of formattedImages) {
const fileMetadata = {
@ -169,8 +187,8 @@ async function encodeAndFormat(req, files, endpoint, mode) {
};
if (mode === VisionModes.agents) {
result.image_urls.push(imagePart);
result.files.push(fileMetadata);
result.image_urls.push({ ...imagePart });
result.files.push({ ...fileMetadata });
continue;
}
@ -192,10 +210,11 @@ async function encodeAndFormat(req, files, endpoint, mode) {
delete imagePart.image_url;
}
result.image_urls.push(imagePart);
result.files.push(fileMetadata);
result.image_urls.push({ ...imagePart });
result.files.push({ ...fileMetadata });
}
return result;
formattedImages.length = 0;
return { ...result };
}
module.exports = {