From 3c91f7b0b71b024fbd540802afc3c7fdf8a3d757 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Mon, 31 Mar 2025 18:40:06 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=20feat:=20Enhance=20S3=20URL=20Exp?= =?UTF-8?q?iry=20with=20Refresh;=20fix:=20S3=20File=20Deletion=20(#6647)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: Improve error logging in image fetching to base64 conversion * fix: Add error handling for custom endpoint configuration retrieval * fix: Update audio stream processing to parse text parts from complex message content * chore: import order in streamAudio * fix: S3 file deletion and optimize file upload * feat: Implement S3 URL refresh mechanism and add cache for expiry check intervals * feat: Add S3 URL refresh functionality for agent avatars * chore: remove unnecessary console.log in MultiMessage component * chore: update version of librechat-data-provider to 0.7.77 --- api/cache/getLogStores.js | 5 + api/models/File.js | 23 ++ api/server/controllers/agents/client.js | 9 +- api/server/controllers/agents/v1.js | 12 +- api/server/routes/files/files.js | 18 +- .../services/Files/Audio/streamAudio.js | 13 +- api/server/services/Files/S3/crud.js | 300 ++++++++++++++++-- api/server/services/Files/images/encode.js | 5 +- .../components/Chat/Messages/MultiMessage.tsx | 7 +- package-lock.json | 2 +- packages/data-provider/package.json | 2 +- packages/data-provider/src/config.ts | 4 + 12 files changed, 358 insertions(+), 42 deletions(-) diff --git a/api/cache/getLogStores.js b/api/cache/getLogStores.js index 6592371f02..6d5ea15a7b 100644 --- a/api/cache/getLogStores.js +++ b/api/cache/getLogStores.js @@ -49,6 +49,10 @@ const genTitle = isRedisEnabled ? new Keyv({ store: keyvRedis, ttl: Time.TWO_MINUTES }) : new Keyv({ namespace: CacheKeys.GEN_TITLE, ttl: Time.TWO_MINUTES }); +const s3ExpiryInterval = isRedisEnabled + ? new Keyv({ store: keyvRedis, ttl: Time.THIRTY_MINUTES }) + : new Keyv({ namespace: CacheKeys.S3_EXPIRY_INTERVAL, ttl: Time.THIRTY_MINUTES }); + const modelQueries = isEnabled(process.env.USE_REDIS) ? new Keyv({ store: keyvRedis }) : new Keyv({ namespace: CacheKeys.MODEL_QUERIES }); @@ -89,6 +93,7 @@ const namespaces = { [CacheKeys.ABORT_KEYS]: abortKeys, [CacheKeys.TOKEN_CONFIG]: tokenConfig, [CacheKeys.GEN_TITLE]: genTitle, + [CacheKeys.S3_EXPIRY_INTERVAL]: s3ExpiryInterval, [CacheKeys.MODEL_QUERIES]: modelQueries, [CacheKeys.AUDIO_RUNS]: audioRuns, [CacheKeys.MESSAGES]: messages, diff --git a/api/models/File.js b/api/models/File.js index 0bde258a54..87c91003e2 100644 --- a/api/models/File.js +++ b/api/models/File.js @@ -134,6 +134,28 @@ const deleteFiles = async (file_ids, user) => { return await File.deleteMany(deleteQuery); }; +/** + * Batch updates files with new signed URLs in MongoDB + * + * @param {MongoFile[]} updates - Array of updates in the format { file_id, filepath } + * @returns {Promise} + */ +async function batchUpdateFiles(updates) { + if (!updates || updates.length === 0) { + return; + } + + const bulkOperations = updates.map((update) => ({ + updateOne: { + filter: { file_id: update.file_id }, + update: { $set: { filepath: update.filepath } }, + }, + })); + + const result = await File.bulkWrite(bulkOperations); + logger.info(`Updated ${result.modifiedCount} files with new S3 URLs`); +} + module.exports = { File, findFileById, @@ -145,4 +167,5 @@ module.exports = { deleteFile, deleteFiles, deleteFileByFilter, + batchUpdateFiles, }; diff --git a/api/server/controllers/agents/client.js b/api/server/controllers/agents/client.js index 0473ab8747..ee23ee1db6 100644 --- a/api/server/controllers/agents/client.js +++ b/api/server/controllers/agents/client.js @@ -932,7 +932,14 @@ class AgentClient extends BaseClient { }; let endpointConfig = this.options.req.app.locals[this.options.agent.endpoint]; if (!endpointConfig) { - endpointConfig = await getCustomEndpointConfig(this.options.agent.endpoint); + try { + endpointConfig = await getCustomEndpointConfig(this.options.agent.endpoint); + } catch (err) { + logger.error( + '[api/server/controllers/agents/client.js #titleConvo] Error getting custom endpoint config', + err, + ); + } } if ( endpointConfig && diff --git a/api/server/controllers/agents/v1.js b/api/server/controllers/agents/v1.js index 52e6ed2fc9..e0f27a13fc 100644 --- a/api/server/controllers/agents/v1.js +++ b/api/server/controllers/agents/v1.js @@ -4,6 +4,7 @@ const { Tools, Constants, FileContext, + FileSources, SystemRoles, EToolResources, actionDelimiter, @@ -17,9 +18,10 @@ const { } = require('~/models/Agent'); const { uploadImageBuffer, filterFile } = require('~/server/services/Files/process'); const { getStrategyFunctions } = require('~/server/services/Files/strategies'); +const { refreshS3Url } = require('~/server/services/Files/S3/crud'); const { updateAction, getActions } = require('~/models/Action'); -const { getProjectByName } = require('~/models/Project'); const { updateAgentProjects } = require('~/models/Agent'); +const { getProjectByName } = require('~/models/Project'); const { deleteFileByFilter } = require('~/models/File'); const { logger } = require('~/config'); @@ -102,6 +104,14 @@ const getAgentHandler = async (req, res) => { return res.status(404).json({ error: 'Agent not found' }); } + if (agent.avatar && agent.avatar?.source === FileSources.s3) { + const originalUrl = agent.avatar.filepath; + agent.avatar.filepath = await refreshS3Url(agent.avatar); + if (originalUrl !== agent.avatar.filepath) { + await updateAgent({ id }, { avatar: agent.avatar }); + } + } + agent.author = agent.author.toString(); agent.isCollaborative = !!agent.isCollaborative; diff --git a/api/server/routes/files/files.js b/api/server/routes/files/files.js index c371b8e28e..9040c2824c 100644 --- a/api/server/routes/files/files.js +++ b/api/server/routes/files/files.js @@ -2,7 +2,9 @@ const fs = require('fs').promises; const express = require('express'); const { EnvVar } = require('@librechat/agents'); const { + Time, isUUID, + CacheKeys, FileSources, EModelEndpoint, isAgentsEndpoint, @@ -17,8 +19,10 @@ const { const { getStrategyFunctions } = require('~/server/services/Files/strategies'); const { getOpenAIClient } = require('~/server/controllers/assistants/helpers'); const { loadAuthValues } = require('~/server/services/Tools/credentials'); +const { refreshS3FileUrls } = require('~/server/services/Files/S3/crud'); +const { getFiles, batchUpdateFiles } = require('~/models/File'); const { getAgent } = require('~/models/Agent'); -const { getFiles } = require('~/models/File'); +const { getLogStores } = require('~/cache'); const { logger } = require('~/config'); const router = express.Router(); @@ -26,6 +30,18 @@ const router = express.Router(); router.get('/', async (req, res) => { try { const files = await getFiles({ user: req.user.id }); + if (req.app.locals.fileStrategy === FileSources.s3) { + try { + const cache = getLogStores(CacheKeys.S3_EXPIRY_INTERVAL); + const alreadyChecked = await cache.get(req.user.id); + if (!alreadyChecked) { + await refreshS3FileUrls(files, batchUpdateFiles); + await cache.set(req.user.id, true, Time.THIRTY_MINUTES); + } + } catch (error) { + logger.warn('[/files] Error refreshing S3 file URLs:', error); + } + } res.status(200).send(files); } catch (error) { logger.error('[/files] Error getting files:', error); diff --git a/api/server/services/Files/Audio/streamAudio.js b/api/server/services/Files/Audio/streamAudio.js index ac046e68a6..a1d7c7a649 100644 --- a/api/server/services/Files/Audio/streamAudio.js +++ b/api/server/services/Files/Audio/streamAudio.js @@ -1,4 +1,10 @@ -const { CacheKeys, findLastSeparatorIndex, SEPARATORS, Time } = require('librechat-data-provider'); +const { + Time, + CacheKeys, + SEPARATORS, + parseTextParts, + findLastSeparatorIndex, +} = require('librechat-data-provider'); const { getMessage } = require('~/models/Message'); const { getLogStores } = require('~/cache'); @@ -84,10 +90,11 @@ function createChunkProcessor(user, messageId) { notFoundCount++; return []; } else { + const text = message.content?.length > 0 ? parseTextParts(message.content) : message.text; messageCache.set( messageId, { - text: message.text, + text, complete: true, }, Time.FIVE_MINUTES, @@ -95,7 +102,7 @@ function createChunkProcessor(user, messageId) { } const text = typeof message === 'string' ? message : message.text; - const complete = typeof message === 'string' ? false : message.complete ?? true; + const complete = typeof message === 'string' ? false : (message.complete ?? true); if (text === processedText) { noChangeCount++; diff --git a/api/server/services/Files/S3/crud.js b/api/server/services/Files/S3/crud.js index a9ddcf455e..8696645204 100644 --- a/api/server/services/Files/S3/crud.js +++ b/api/server/services/Files/S3/crud.js @@ -1,7 +1,13 @@ const fs = require('fs'); const path = require('path'); const fetch = require('node-fetch'); -const { PutObjectCommand, GetObjectCommand, DeleteObjectCommand } = require('@aws-sdk/client-s3'); +const { FileSources } = require('librechat-data-provider'); +const { + PutObjectCommand, + GetObjectCommand, + HeadObjectCommand, + DeleteObjectCommand, +} = require('@aws-sdk/client-s3'); const { getSignedUrl } = require('@aws-sdk/s3-request-presigner'); const { initializeS3 } = require('./initialize'); const { logger } = require('~/config'); @@ -9,6 +15,20 @@ const { logger } = require('~/config'); const bucketName = process.env.AWS_BUCKET_NAME; const defaultBasePath = 'images'; +let s3UrlExpirySeconds = 7 * 24 * 60 * 60; + +if (process.env.S3_URL_EXPIRY_SECONDS !== undefined) { + const parsed = parseInt(process.env.S3_URL_EXPIRY_SECONDS, 10); + + if (!isNaN(parsed) && parsed > 0) { + s3UrlExpirySeconds = Math.min(parsed, 7 * 24 * 60 * 60); + } else { + logger.warn( + `[S3] Invalid S3_URL_EXPIRY_SECONDS value: "${process.env.S3_URL_EXPIRY_SECONDS}". Using 7-day expiry.`, + ); + } +} + /** * Constructs the S3 key based on the base path, user ID, and file name. */ @@ -39,13 +59,14 @@ async function saveBufferToS3({ userId, buffer, fileName, basePath = defaultBase } /** - * Retrieves a signed URL for a file stored in S3. + * Retrieves a URL for a file stored in S3. + * Returns a signed URL with expiration time or a proxy URL based on config * * @param {Object} params * @param {string} params.userId - The user's unique identifier. * @param {string} params.fileName - The file name in S3. * @param {string} [params.basePath='images'] - The base path in the bucket. - * @returns {Promise} A signed URL valid for 24 hours. + * @returns {Promise} A URL to access the S3 object */ async function getS3URL({ userId, fileName, basePath = defaultBasePath }) { const key = getS3Key(basePath, userId, fileName); @@ -53,7 +74,7 @@ async function getS3URL({ userId, fileName, basePath = defaultBasePath }) { try { const s3 = initializeS3(); - return await getSignedUrl(s3, new GetObjectCommand(params), { expiresIn: 86400 }); + return await getSignedUrl(s3, new GetObjectCommand(params), { expiresIn: s3UrlExpirySeconds }); } catch (error) { logger.error('[getS3URL] Error getting signed URL from S3:', error.message); throw error; @@ -86,21 +107,51 @@ async function saveURLToS3({ userId, URL, fileName, basePath = defaultBasePath } * Deletes a file from S3. * * @param {Object} params - * @param {string} params.userId - The user's unique identifier. - * @param {string} params.fileName - The file name in S3. - * @param {string} [params.basePath='images'] - The base path in the bucket. + * @param {ServerRequest} params.req + * @param {MongoFile} params.file - The file object to delete. * @returns {Promise} */ -async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath }) { - const key = getS3Key(basePath, userId, fileName); +async function deleteFileFromS3(req, file) { + const key = extractKeyFromS3Url(file.filepath); const params = { Bucket: bucketName, Key: key }; + if (!key.includes(req.user.id)) { + const message = `[deleteFileFromS3] User ID mismatch: ${req.user.id} vs ${key}`; + logger.error(message); + throw new Error(message); + } try { const s3 = initializeS3(); - await s3.send(new DeleteObjectCommand(params)); - logger.debug('[deleteFileFromS3] File deleted successfully from S3'); + + try { + const headCommand = new HeadObjectCommand(params); + await s3.send(headCommand); + logger.debug('[deleteFileFromS3] File exists, proceeding with deletion'); + } catch (headErr) { + if (headErr.name === 'NotFound') { + logger.warn(`[deleteFileFromS3] File does not exist: ${key}`); + return; + } + } + + const deleteResult = await s3.send(new DeleteObjectCommand(params)); + logger.debug('[deleteFileFromS3] Delete command response:', JSON.stringify(deleteResult)); + try { + await s3.send(new HeadObjectCommand(params)); + logger.error('[deleteFileFromS3] File still exists after deletion!'); + } catch (verifyErr) { + if (verifyErr.name === 'NotFound') { + logger.debug(`[deleteFileFromS3] Verified file is deleted: ${key}`); + } else { + logger.error('[deleteFileFromS3] Error verifying deletion:', verifyErr); + } + } + + logger.debug('[deleteFileFromS3] S3 File deletion completed'); } catch (error) { - logger.error('[deleteFileFromS3] Error deleting file from S3:', error.message); + logger.error(`[deleteFileFromS3] Error deleting file from S3: ${error.message}`); + logger.error(error.stack); + // If the file is not found, we can safely return. if (error.code === 'NoSuchKey') { return; @@ -110,7 +161,7 @@ async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath } } /** - * Uploads a local file to S3. + * Uploads a local file to S3 by streaming it directly without loading into memory. * * @param {Object} params * @param {import('express').Request} params.req - The Express request (must include user). @@ -122,35 +173,62 @@ async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath } async function uploadFileToS3({ req, file, file_id, basePath = defaultBasePath }) { try { const inputFilePath = file.path; - const inputBuffer = await fs.promises.readFile(inputFilePath); - const bytes = Buffer.byteLength(inputBuffer); const userId = req.user.id; const fileName = `${file_id}__${path.basename(inputFilePath)}`; - const fileURL = await saveBufferToS3({ userId, buffer: inputBuffer, fileName, basePath }); - await fs.promises.unlink(inputFilePath); + const key = getS3Key(basePath, userId, fileName); + + const stats = await fs.promises.stat(inputFilePath); + const bytes = stats.size; + const fileStream = fs.createReadStream(inputFilePath); + + const s3 = initializeS3(); + const uploadParams = { + Bucket: bucketName, + Key: key, + Body: fileStream, + }; + + await s3.send(new PutObjectCommand(uploadParams)); + const fileURL = await getS3URL({ userId, fileName, basePath }); return { filepath: fileURL, bytes }; } catch (error) { - logger.error('[uploadFileToS3] Error uploading file to S3:', error.message); + logger.error('[uploadFileToS3] Error streaming file to S3:', error); + try { + if (file && file.path) { + await fs.promises.unlink(file.path); + } + } catch (unlinkError) { + logger.error( + '[uploadFileToS3] Error deleting temporary file, likely already deleted:', + unlinkError.message, + ); + } throw error; } } /** - * Extracts the S3 key from a full S3 URL. + * Extracts the S3 key from a URL or returns the key if already properly formatted * - * @param {string} s3Url - The full S3 URL + * @param {string} fileUrlOrKey - The file URL or key * @returns {string} The S3 key */ -function extractKeyFromS3Url(s3Url) { - try { - // Parse the URL - const url = new URL(s3Url); - // Extract the path from the URL, removing the leading slash - let key = url.pathname.substring(1); +function extractKeyFromS3Url(fileUrlOrKey) { + if (!fileUrlOrKey) { + throw new Error('Invalid input: URL or key is empty'); + } - return key; + try { + const url = new URL(fileUrlOrKey); + return url.pathname.substring(1); } catch (error) { - throw new Error(`Failed to extract key from S3 URL: ${error.message}`); + const parts = fileUrlOrKey.split('/'); + + if (parts.length >= 3 && !fileUrlOrKey.startsWith('http') && !fileUrlOrKey.startsWith('/')) { + return fileUrlOrKey; + } + + return fileUrlOrKey.startsWith('/') ? fileUrlOrKey.substring(1) : fileUrlOrKey; } } @@ -174,6 +252,170 @@ async function getS3FileStream(_req, filePath) { } } +/** + * Determines if a signed S3 URL is close to expiration + * + * @param {string} signedUrl - The signed S3 URL + * @param {number} bufferSeconds - Buffer time in seconds + * @returns {boolean} True if the URL needs refreshing + */ +function needsRefresh(signedUrl, bufferSeconds) { + try { + // Parse the URL + const url = new URL(signedUrl); + + // Check if it has the signature parameters that indicate it's a signed URL + // X-Amz-Signature is the most reliable indicator for AWS signed URLs + if (!url.searchParams.has('X-Amz-Signature')) { + // Not a signed URL, so no expiration to check (or it's already a proxy URL) + return false; + } + + // Extract the expiration time from the URL + const expiresParam = url.searchParams.get('X-Amz-Expires'); + const dateParam = url.searchParams.get('X-Amz-Date'); + + if (!expiresParam || !dateParam) { + // Missing expiration information, assume it needs refresh to be safe + return true; + } + + // Parse the AWS date format (YYYYMMDDTHHMMSSZ) + const year = dateParam.substring(0, 4); + const month = dateParam.substring(4, 6); + const day = dateParam.substring(6, 8); + const hour = dateParam.substring(9, 11); + const minute = dateParam.substring(11, 13); + const second = dateParam.substring(13, 15); + + const dateObj = new Date(`${year}-${month}-${day}T${hour}:${minute}:${second}Z`); + const expiresAtDate = new Date(dateObj.getTime() + parseInt(expiresParam) * 1000); + + // Check if it's close to expiration + const now = new Date(); + const bufferTime = new Date(now.getTime() + bufferSeconds * 1000); + + return expiresAtDate <= bufferTime; + } catch (error) { + logger.error('Error checking URL expiration:', error); + // If we can't determine, assume it needs refresh to be safe + return true; + } +} + +/** + * Refreshes S3 URLs for an array of files if they're expired or close to expiring + * + * @param {IMongoFile[]} files - Array of file documents + * @param {(files: MongoFile[]) => Promise} batchUpdateFiles - Function to update files in the database + * @param {number} [bufferSeconds=3600] - Buffer time in seconds to check for expiration + * @returns {Promise} The files with refreshed URLs if needed + */ +async function refreshS3FileUrls(files, batchUpdateFiles, bufferSeconds = 3600) { + if (!files || !Array.isArray(files) || files.length === 0) { + return files; + } + + const filesToUpdate = []; + + for (let i = 0; i < files.length; i++) { + const file = files[i]; + if (!file?.file_id) { + continue; + } + if (file.source !== FileSources.s3) { + continue; + } + if (!file.filepath) { + continue; + } + if (!needsRefresh(file.filepath, bufferSeconds)) { + continue; + } + try { + const s3Key = extractKeyFromS3Url(file.filepath); + if (!s3Key) { + continue; + } + const keyParts = s3Key.split('/'); + if (keyParts.length < 3) { + continue; + } + + const basePath = keyParts[0]; + const userId = keyParts[1]; + const fileName = keyParts.slice(2).join('/'); + + const newUrl = await getS3URL({ + userId, + fileName, + basePath, + }); + + filesToUpdate.push({ + file_id: file.file_id, + filepath: newUrl, + }); + files[i].filepath = newUrl; + } catch (error) { + logger.error(`Error refreshing S3 URL for file ${file.file_id}:`, error); + } + } + + if (filesToUpdate.length > 0) { + await batchUpdateFiles(filesToUpdate); + } + + return files; +} + +/** + * Refreshes a single S3 URL if it's expired or close to expiring + * + * @param {{ filepath: string, source: string }} fileObj - Simple file object containing filepath and source + * @param {number} [bufferSeconds=3600] - Buffer time in seconds to check for expiration + * @returns {Promise} The refreshed URL or the original URL if no refresh needed + */ +async function refreshS3Url(fileObj, bufferSeconds = 3600) { + if (!fileObj || fileObj.source !== FileSources.s3 || !fileObj.filepath) { + return fileObj?.filepath || ''; + } + + if (!needsRefresh(fileObj.filepath, bufferSeconds)) { + return fileObj.filepath; + } + + try { + const s3Key = extractKeyFromS3Url(fileObj.filepath); + if (!s3Key) { + logger.warn(`Unable to extract S3 key from URL: ${fileObj.filepath}`); + return fileObj.filepath; + } + + const keyParts = s3Key.split('/'); + if (keyParts.length < 3) { + logger.warn(`Invalid S3 key format: ${s3Key}`); + return fileObj.filepath; + } + + const basePath = keyParts[0]; + const userId = keyParts[1]; + const fileName = keyParts.slice(2).join('/'); + + const newUrl = await getS3URL({ + userId, + fileName, + basePath, + }); + + logger.debug(`Refreshed S3 URL for key: ${s3Key}`); + return newUrl; + } catch (error) { + logger.error(`Error refreshing S3 URL: ${error.message}`); + return fileObj.filepath; + } +} + module.exports = { saveBufferToS3, saveURLToS3, @@ -181,4 +423,6 @@ module.exports = { deleteFileFromS3, uploadFileToS3, getS3FileStream, + refreshS3FileUrls, + refreshS3Url, }; diff --git a/api/server/services/Files/images/encode.js b/api/server/services/Files/images/encode.js index 85d6513977..f733a0d6d6 100644 --- a/api/server/services/Files/images/encode.js +++ b/api/server/services/Files/images/encode.js @@ -7,6 +7,7 @@ const { EModelEndpoint, } = require('librechat-data-provider'); const { getStrategyFunctions } = require('~/server/services/Files/strategies'); +const { logAxiosError } = require('~/utils'); const { logger } = require('~/config'); /** @@ -24,8 +25,8 @@ async function fetchImageToBase64(url) { }); return Buffer.from(response.data).toString('base64'); } catch (error) { - logger.error('Error fetching image to convert to base64', error); - throw error; + const message = 'Error fetching image to convert to base64'; + throw new Error(logAxiosError({ message, error })); } } diff --git a/client/src/components/Chat/Messages/MultiMessage.tsx b/client/src/components/Chat/Messages/MultiMessage.tsx index 3cca4ff135..9050a6149f 100644 --- a/client/src/components/Chat/Messages/MultiMessage.tsx +++ b/client/src/components/Chat/Messages/MultiMessage.tsx @@ -3,11 +3,11 @@ import { useEffect, useCallback } from 'react'; import { isAssistantsEndpoint } from 'librechat-data-provider'; import type { TMessage } from 'librechat-data-provider'; import type { TMessageProps } from '~/common'; -// eslint-disable-next-line import/no-cycle + import MessageContent from '~/components/Messages/MessageContent'; -// eslint-disable-next-line import/no-cycle + import MessageParts from './MessageParts'; -// eslint-disable-next-line import/no-cycle + import Message from './Message'; import store from '~/store'; @@ -61,7 +61,6 @@ export default function MultiMessage({ /> ); } else if (message.content) { - console.log('message.id with content', message.messageId); return (