🔧 fix: S3 Download Stream with Key Extraction and Blob Storage Encoding for Vision (#6557)

This commit is contained in:
Danny Avila 2025-03-26 15:04:01 -04:00 committed by GitHub
parent 299cabd6ed
commit ea2cbc55a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 79 additions and 9 deletions

View file

@ -135,20 +135,41 @@ async function uploadFileToS3({ req, file, file_id, basePath = defaultBasePath }
}
}
/**
* Extracts the S3 key from a full S3 URL.
*
* @param {string} s3Url - The full S3 URL
* @returns {string} The S3 key
*/
function extractKeyFromS3Url(s3Url) {
try {
// Parse the URL
const url = new URL(s3Url);
// Extract the path from the URL, removing the leading slash
let key = url.pathname.substring(1);
return key;
} catch (error) {
throw new Error(`Failed to extract key from S3 URL: ${error.message}`);
}
}
/**
* Retrieves a readable stream for a file stored in S3.
*
* @param {ServerRequest} req - Server request object.
* @param {string} filePath - The S3 key of the file.
* @returns {Promise<NodeJS.ReadableStream>}
*/
async function getS3FileStream(filePath) {
const params = { Bucket: bucketName, Key: filePath };
async function getS3FileStream(_req, filePath) {
try {
const Key = extractKeyFromS3Url(filePath);
const params = { Bucket: bucketName, Key };
const s3 = initializeS3();
const data = await s3.send(new GetObjectCommand(params));
return data.Body; // Returns a Node.js ReadableStream.
} catch (error) {
logger.error('[getS3FileStream] Error retrieving S3 file stream:', error.message);
logger.error('[getS3FileStream] Error retrieving S3 file stream:', error);
throw error;
}
}

View file

@ -37,17 +37,21 @@ const base64Only = new Set([
EModelEndpoint.bedrock,
]);
const blobStorageSources = new Set([FileSources.azure, FileSources.s3]);
/**
* Encodes and formats the given files.
* @param {Express.Request} req - The request object.
* @param {Array<MongoFile>} files - The array of files to encode and format.
* @param {EModelEndpoint} [endpoint] - Optional: The endpoint for the image.
* @param {string} [mode] - Optional: The endpoint mode for the image.
* @returns {Promise<Object>} - A promise that resolves to the result object containing the encoded images and file details.
* @returns {Promise<{ text: string; files: MongoFile[]; image_urls: MessageContentImageUrl[] }>} - A promise that resolves to the result object containing the encoded images and file details.
*/
async function encodeAndFormat(req, files, endpoint, mode) {
const promises = [];
/** @type {Record<FileSources, Pick<ReturnType<typeof getStrategyFunctions>, 'prepareImagePayload' | 'getDownloadStream'>>} */
const encodingMethods = {};
/** @type {{ text: string; files: MongoFile[]; image_urls: MessageContentImageUrl[] }} */
const result = {
text: '',
files: [],
@ -59,6 +63,7 @@ async function encodeAndFormat(req, files, endpoint, mode) {
}
for (let file of files) {
/** @type {FileSources} */
const source = file.source ?? FileSources.local;
if (source === FileSources.text && file.text) {
result.text += `${!result.text ? 'Attached document(s):\n```md' : '\n\n---\n\n'}# "${file.filename}"\n${file.text}\n`;
@ -70,18 +75,51 @@ async function encodeAndFormat(req, files, endpoint, mode) {
}
if (!encodingMethods[source]) {
const { prepareImagePayload } = getStrategyFunctions(source);
const { prepareImagePayload, getDownloadStream } = getStrategyFunctions(source);
if (!prepareImagePayload) {
throw new Error(`Encoding function not implemented for ${source}`);
}
encodingMethods[source] = prepareImagePayload;
encodingMethods[source] = { prepareImagePayload, getDownloadStream };
}
const preparePayload = encodingMethods[source];
const preparePayload = encodingMethods[source].prepareImagePayload;
/* We need to fetch the image and convert it to base64 if we are using S3/Azure Blob storage. */
if (blobStorageSources.has(source)) {
try {
const downloadStream = encodingMethods[source].getDownloadStream;
const stream = await downloadStream(req, file.filepath);
const streamPromise = new Promise((resolve, reject) => {
/** @type {Uint8Array[]} */
const chunks = [];
stream.on('readable', () => {
let chunk;
while (null !== (chunk = stream.read())) {
chunks.push(chunk);
}
});
stream.on('end', () => {
const buffer = Buffer.concat(chunks);
const base64Data = buffer.toString('base64');
resolve(base64Data);
});
stream.on('error', (error) => {
reject(error);
});
});
const base64Data = await streamPromise;
promises.push([file, base64Data]);
} catch (error) {
logger.error(
`Error processing blob storage file stream for ${file.name} base64 payload:`,
error,
);
continue;
}
/* Google & Anthropic don't support passing URLs to payload */
if (source !== FileSources.local && base64Only.has(endpoint)) {
} else if (source !== FileSources.local && base64Only.has(endpoint)) {
const [_file, imageURL] = await preparePayload(req, file);
promises.push([_file, await fetchImageToBase64(imageURL)]);
continue;

View file

@ -403,6 +403,12 @@
* @memberof typedefs
*/
/**
* @exports MessageContentImageUrl
* @typedef {import('librechat-data-provider').Agents.MessageContentImageUrl} MessageContentImageUrl
* @memberof typedefs
*/
/** Prompts */
/**
* @exports TPrompt
@ -759,6 +765,11 @@
* @typedef {import('mongoose').Schema} MongooseSchema
* @memberof typedefs
*/
/**
* @exports MongoFile
* @typedef {import('@librechat/data-schemas').IMongoFile} MongoFile
* @memberof typedefs
*/
/**
* @exports ObjectId