🚀 feat: Enhance S3 URL Expiry with Refresh; fix: S3 File Deletion (#6647)

* refactor: Improve error logging in image fetching to base64 conversion

* fix: Add error handling for custom endpoint configuration retrieval

* fix: Update audio stream processing to parse text parts from complex message content

* chore: import order in streamAudio

* fix: S3 file deletion and optimize file upload

* feat: Implement S3 URL refresh mechanism and add cache for expiry check intervals

* feat: Add S3 URL refresh functionality for agent avatars

* chore: remove unnecessary console.log in MultiMessage component

* chore: update version of librechat-data-provider to 0.7.77
This commit is contained in:
Danny Avila 2025-03-31 18:40:06 -04:00 committed by GitHub
parent bc039cea29
commit 3c91f7b0b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 358 additions and 42 deletions

View file

@ -49,6 +49,10 @@ const genTitle = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.TWO_MINUTES })
: new Keyv({ namespace: CacheKeys.GEN_TITLE, ttl: Time.TWO_MINUTES });
const s3ExpiryInterval = isRedisEnabled
? new Keyv({ store: keyvRedis, ttl: Time.THIRTY_MINUTES })
: new Keyv({ namespace: CacheKeys.S3_EXPIRY_INTERVAL, ttl: Time.THIRTY_MINUTES });
const modelQueries = isEnabled(process.env.USE_REDIS)
? new Keyv({ store: keyvRedis })
: new Keyv({ namespace: CacheKeys.MODEL_QUERIES });
@ -89,6 +93,7 @@ const namespaces = {
[CacheKeys.ABORT_KEYS]: abortKeys,
[CacheKeys.TOKEN_CONFIG]: tokenConfig,
[CacheKeys.GEN_TITLE]: genTitle,
[CacheKeys.S3_EXPIRY_INTERVAL]: s3ExpiryInterval,
[CacheKeys.MODEL_QUERIES]: modelQueries,
[CacheKeys.AUDIO_RUNS]: audioRuns,
[CacheKeys.MESSAGES]: messages,

View file

@ -134,6 +134,28 @@ const deleteFiles = async (file_ids, user) => {
return await File.deleteMany(deleteQuery);
};
/**
* Batch updates files with new signed URLs in MongoDB
*
* @param {MongoFile[]} updates - Array of updates in the format { file_id, filepath }
* @returns {Promise<void>}
*/
async function batchUpdateFiles(updates) {
if (!updates || updates.length === 0) {
return;
}
const bulkOperations = updates.map((update) => ({
updateOne: {
filter: { file_id: update.file_id },
update: { $set: { filepath: update.filepath } },
},
}));
const result = await File.bulkWrite(bulkOperations);
logger.info(`Updated ${result.modifiedCount} files with new S3 URLs`);
}
module.exports = {
File,
findFileById,
@ -145,4 +167,5 @@ module.exports = {
deleteFile,
deleteFiles,
deleteFileByFilter,
batchUpdateFiles,
};

View file

@ -932,7 +932,14 @@ class AgentClient extends BaseClient {
};
let endpointConfig = this.options.req.app.locals[this.options.agent.endpoint];
if (!endpointConfig) {
endpointConfig = await getCustomEndpointConfig(this.options.agent.endpoint);
try {
endpointConfig = await getCustomEndpointConfig(this.options.agent.endpoint);
} catch (err) {
logger.error(
'[api/server/controllers/agents/client.js #titleConvo] Error getting custom endpoint config',
err,
);
}
}
if (
endpointConfig &&

View file

@ -4,6 +4,7 @@ const {
Tools,
Constants,
FileContext,
FileSources,
SystemRoles,
EToolResources,
actionDelimiter,
@ -17,9 +18,10 @@ const {
} = require('~/models/Agent');
const { uploadImageBuffer, filterFile } = require('~/server/services/Files/process');
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
const { refreshS3Url } = require('~/server/services/Files/S3/crud');
const { updateAction, getActions } = require('~/models/Action');
const { getProjectByName } = require('~/models/Project');
const { updateAgentProjects } = require('~/models/Agent');
const { getProjectByName } = require('~/models/Project');
const { deleteFileByFilter } = require('~/models/File');
const { logger } = require('~/config');
@ -102,6 +104,14 @@ const getAgentHandler = async (req, res) => {
return res.status(404).json({ error: 'Agent not found' });
}
if (agent.avatar && agent.avatar?.source === FileSources.s3) {
const originalUrl = agent.avatar.filepath;
agent.avatar.filepath = await refreshS3Url(agent.avatar);
if (originalUrl !== agent.avatar.filepath) {
await updateAgent({ id }, { avatar: agent.avatar });
}
}
agent.author = agent.author.toString();
agent.isCollaborative = !!agent.isCollaborative;

View file

@ -2,7 +2,9 @@ const fs = require('fs').promises;
const express = require('express');
const { EnvVar } = require('@librechat/agents');
const {
Time,
isUUID,
CacheKeys,
FileSources,
EModelEndpoint,
isAgentsEndpoint,
@ -17,8 +19,10 @@ const {
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
const { getOpenAIClient } = require('~/server/controllers/assistants/helpers');
const { loadAuthValues } = require('~/server/services/Tools/credentials');
const { refreshS3FileUrls } = require('~/server/services/Files/S3/crud');
const { getFiles, batchUpdateFiles } = require('~/models/File');
const { getAgent } = require('~/models/Agent');
const { getFiles } = require('~/models/File');
const { getLogStores } = require('~/cache');
const { logger } = require('~/config');
const router = express.Router();
@ -26,6 +30,18 @@ const router = express.Router();
router.get('/', async (req, res) => {
try {
const files = await getFiles({ user: req.user.id });
if (req.app.locals.fileStrategy === FileSources.s3) {
try {
const cache = getLogStores(CacheKeys.S3_EXPIRY_INTERVAL);
const alreadyChecked = await cache.get(req.user.id);
if (!alreadyChecked) {
await refreshS3FileUrls(files, batchUpdateFiles);
await cache.set(req.user.id, true, Time.THIRTY_MINUTES);
}
} catch (error) {
logger.warn('[/files] Error refreshing S3 file URLs:', error);
}
}
res.status(200).send(files);
} catch (error) {
logger.error('[/files] Error getting files:', error);

View file

@ -1,4 +1,10 @@
const { CacheKeys, findLastSeparatorIndex, SEPARATORS, Time } = require('librechat-data-provider');
const {
Time,
CacheKeys,
SEPARATORS,
parseTextParts,
findLastSeparatorIndex,
} = require('librechat-data-provider');
const { getMessage } = require('~/models/Message');
const { getLogStores } = require('~/cache');
@ -84,10 +90,11 @@ function createChunkProcessor(user, messageId) {
notFoundCount++;
return [];
} else {
const text = message.content?.length > 0 ? parseTextParts(message.content) : message.text;
messageCache.set(
messageId,
{
text: message.text,
text,
complete: true,
},
Time.FIVE_MINUTES,
@ -95,7 +102,7 @@ function createChunkProcessor(user, messageId) {
}
const text = typeof message === 'string' ? message : message.text;
const complete = typeof message === 'string' ? false : message.complete ?? true;
const complete = typeof message === 'string' ? false : (message.complete ?? true);
if (text === processedText) {
noChangeCount++;

View file

@ -1,7 +1,13 @@
const fs = require('fs');
const path = require('path');
const fetch = require('node-fetch');
const { PutObjectCommand, GetObjectCommand, DeleteObjectCommand } = require('@aws-sdk/client-s3');
const { FileSources } = require('librechat-data-provider');
const {
PutObjectCommand,
GetObjectCommand,
HeadObjectCommand,
DeleteObjectCommand,
} = require('@aws-sdk/client-s3');
const { getSignedUrl } = require('@aws-sdk/s3-request-presigner');
const { initializeS3 } = require('./initialize');
const { logger } = require('~/config');
@ -9,6 +15,20 @@ const { logger } = require('~/config');
const bucketName = process.env.AWS_BUCKET_NAME;
const defaultBasePath = 'images';
let s3UrlExpirySeconds = 7 * 24 * 60 * 60;
if (process.env.S3_URL_EXPIRY_SECONDS !== undefined) {
const parsed = parseInt(process.env.S3_URL_EXPIRY_SECONDS, 10);
if (!isNaN(parsed) && parsed > 0) {
s3UrlExpirySeconds = Math.min(parsed, 7 * 24 * 60 * 60);
} else {
logger.warn(
`[S3] Invalid S3_URL_EXPIRY_SECONDS value: "${process.env.S3_URL_EXPIRY_SECONDS}". Using 7-day expiry.`,
);
}
}
/**
* Constructs the S3 key based on the base path, user ID, and file name.
*/
@ -39,13 +59,14 @@ async function saveBufferToS3({ userId, buffer, fileName, basePath = defaultBase
}
/**
* Retrieves a signed URL for a file stored in S3.
* Retrieves a URL for a file stored in S3.
* Returns a signed URL with expiration time or a proxy URL based on config
*
* @param {Object} params
* @param {string} params.userId - The user's unique identifier.
* @param {string} params.fileName - The file name in S3.
* @param {string} [params.basePath='images'] - The base path in the bucket.
* @returns {Promise<string>} A signed URL valid for 24 hours.
* @returns {Promise<string>} A URL to access the S3 object
*/
async function getS3URL({ userId, fileName, basePath = defaultBasePath }) {
const key = getS3Key(basePath, userId, fileName);
@ -53,7 +74,7 @@ async function getS3URL({ userId, fileName, basePath = defaultBasePath }) {
try {
const s3 = initializeS3();
return await getSignedUrl(s3, new GetObjectCommand(params), { expiresIn: 86400 });
return await getSignedUrl(s3, new GetObjectCommand(params), { expiresIn: s3UrlExpirySeconds });
} catch (error) {
logger.error('[getS3URL] Error getting signed URL from S3:', error.message);
throw error;
@ -86,21 +107,51 @@ async function saveURLToS3({ userId, URL, fileName, basePath = defaultBasePath }
* Deletes a file from S3.
*
* @param {Object} params
* @param {string} params.userId - The user's unique identifier.
* @param {string} params.fileName - The file name in S3.
* @param {string} [params.basePath='images'] - The base path in the bucket.
* @param {ServerRequest} params.req
* @param {MongoFile} params.file - The file object to delete.
* @returns {Promise<void>}
*/
async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath }) {
const key = getS3Key(basePath, userId, fileName);
async function deleteFileFromS3(req, file) {
const key = extractKeyFromS3Url(file.filepath);
const params = { Bucket: bucketName, Key: key };
if (!key.includes(req.user.id)) {
const message = `[deleteFileFromS3] User ID mismatch: ${req.user.id} vs ${key}`;
logger.error(message);
throw new Error(message);
}
try {
const s3 = initializeS3();
await s3.send(new DeleteObjectCommand(params));
logger.debug('[deleteFileFromS3] File deleted successfully from S3');
try {
const headCommand = new HeadObjectCommand(params);
await s3.send(headCommand);
logger.debug('[deleteFileFromS3] File exists, proceeding with deletion');
} catch (headErr) {
if (headErr.name === 'NotFound') {
logger.warn(`[deleteFileFromS3] File does not exist: ${key}`);
return;
}
}
const deleteResult = await s3.send(new DeleteObjectCommand(params));
logger.debug('[deleteFileFromS3] Delete command response:', JSON.stringify(deleteResult));
try {
await s3.send(new HeadObjectCommand(params));
logger.error('[deleteFileFromS3] File still exists after deletion!');
} catch (verifyErr) {
if (verifyErr.name === 'NotFound') {
logger.debug(`[deleteFileFromS3] Verified file is deleted: ${key}`);
} else {
logger.error('[deleteFileFromS3] Error verifying deletion:', verifyErr);
}
}
logger.debug('[deleteFileFromS3] S3 File deletion completed');
} catch (error) {
logger.error('[deleteFileFromS3] Error deleting file from S3:', error.message);
logger.error(`[deleteFileFromS3] Error deleting file from S3: ${error.message}`);
logger.error(error.stack);
// If the file is not found, we can safely return.
if (error.code === 'NoSuchKey') {
return;
@ -110,7 +161,7 @@ async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath }
}
/**
* Uploads a local file to S3.
* Uploads a local file to S3 by streaming it directly without loading into memory.
*
* @param {Object} params
* @param {import('express').Request} params.req - The Express request (must include user).
@ -122,35 +173,62 @@ async function deleteFileFromS3({ userId, fileName, basePath = defaultBasePath }
async function uploadFileToS3({ req, file, file_id, basePath = defaultBasePath }) {
try {
const inputFilePath = file.path;
const inputBuffer = await fs.promises.readFile(inputFilePath);
const bytes = Buffer.byteLength(inputBuffer);
const userId = req.user.id;
const fileName = `${file_id}__${path.basename(inputFilePath)}`;
const fileURL = await saveBufferToS3({ userId, buffer: inputBuffer, fileName, basePath });
await fs.promises.unlink(inputFilePath);
const key = getS3Key(basePath, userId, fileName);
const stats = await fs.promises.stat(inputFilePath);
const bytes = stats.size;
const fileStream = fs.createReadStream(inputFilePath);
const s3 = initializeS3();
const uploadParams = {
Bucket: bucketName,
Key: key,
Body: fileStream,
};
await s3.send(new PutObjectCommand(uploadParams));
const fileURL = await getS3URL({ userId, fileName, basePath });
return { filepath: fileURL, bytes };
} catch (error) {
logger.error('[uploadFileToS3] Error uploading file to S3:', error.message);
logger.error('[uploadFileToS3] Error streaming file to S3:', error);
try {
if (file && file.path) {
await fs.promises.unlink(file.path);
}
} catch (unlinkError) {
logger.error(
'[uploadFileToS3] Error deleting temporary file, likely already deleted:',
unlinkError.message,
);
}
throw error;
}
}
/**
* Extracts the S3 key from a full S3 URL.
* Extracts the S3 key from a URL or returns the key if already properly formatted
*
* @param {string} s3Url - The full S3 URL
* @param {string} fileUrlOrKey - The file URL or key
* @returns {string} The S3 key
*/
function extractKeyFromS3Url(s3Url) {
try {
// Parse the URL
const url = new URL(s3Url);
// Extract the path from the URL, removing the leading slash
let key = url.pathname.substring(1);
function extractKeyFromS3Url(fileUrlOrKey) {
if (!fileUrlOrKey) {
throw new Error('Invalid input: URL or key is empty');
}
return key;
try {
const url = new URL(fileUrlOrKey);
return url.pathname.substring(1);
} catch (error) {
throw new Error(`Failed to extract key from S3 URL: ${error.message}`);
const parts = fileUrlOrKey.split('/');
if (parts.length >= 3 && !fileUrlOrKey.startsWith('http') && !fileUrlOrKey.startsWith('/')) {
return fileUrlOrKey;
}
return fileUrlOrKey.startsWith('/') ? fileUrlOrKey.substring(1) : fileUrlOrKey;
}
}
@ -174,6 +252,170 @@ async function getS3FileStream(_req, filePath) {
}
}
/**
* Determines if a signed S3 URL is close to expiration
*
* @param {string} signedUrl - The signed S3 URL
* @param {number} bufferSeconds - Buffer time in seconds
* @returns {boolean} True if the URL needs refreshing
*/
function needsRefresh(signedUrl, bufferSeconds) {
try {
// Parse the URL
const url = new URL(signedUrl);
// Check if it has the signature parameters that indicate it's a signed URL
// X-Amz-Signature is the most reliable indicator for AWS signed URLs
if (!url.searchParams.has('X-Amz-Signature')) {
// Not a signed URL, so no expiration to check (or it's already a proxy URL)
return false;
}
// Extract the expiration time from the URL
const expiresParam = url.searchParams.get('X-Amz-Expires');
const dateParam = url.searchParams.get('X-Amz-Date');
if (!expiresParam || !dateParam) {
// Missing expiration information, assume it needs refresh to be safe
return true;
}
// Parse the AWS date format (YYYYMMDDTHHMMSSZ)
const year = dateParam.substring(0, 4);
const month = dateParam.substring(4, 6);
const day = dateParam.substring(6, 8);
const hour = dateParam.substring(9, 11);
const minute = dateParam.substring(11, 13);
const second = dateParam.substring(13, 15);
const dateObj = new Date(`${year}-${month}-${day}T${hour}:${minute}:${second}Z`);
const expiresAtDate = new Date(dateObj.getTime() + parseInt(expiresParam) * 1000);
// Check if it's close to expiration
const now = new Date();
const bufferTime = new Date(now.getTime() + bufferSeconds * 1000);
return expiresAtDate <= bufferTime;
} catch (error) {
logger.error('Error checking URL expiration:', error);
// If we can't determine, assume it needs refresh to be safe
return true;
}
}
/**
* Refreshes S3 URLs for an array of files if they're expired or close to expiring
*
* @param {IMongoFile[]} files - Array of file documents
* @param {(files: MongoFile[]) => Promise<void>} batchUpdateFiles - Function to update files in the database
* @param {number} [bufferSeconds=3600] - Buffer time in seconds to check for expiration
* @returns {Promise<IMongoFile[]>} The files with refreshed URLs if needed
*/
async function refreshS3FileUrls(files, batchUpdateFiles, bufferSeconds = 3600) {
if (!files || !Array.isArray(files) || files.length === 0) {
return files;
}
const filesToUpdate = [];
for (let i = 0; i < files.length; i++) {
const file = files[i];
if (!file?.file_id) {
continue;
}
if (file.source !== FileSources.s3) {
continue;
}
if (!file.filepath) {
continue;
}
if (!needsRefresh(file.filepath, bufferSeconds)) {
continue;
}
try {
const s3Key = extractKeyFromS3Url(file.filepath);
if (!s3Key) {
continue;
}
const keyParts = s3Key.split('/');
if (keyParts.length < 3) {
continue;
}
const basePath = keyParts[0];
const userId = keyParts[1];
const fileName = keyParts.slice(2).join('/');
const newUrl = await getS3URL({
userId,
fileName,
basePath,
});
filesToUpdate.push({
file_id: file.file_id,
filepath: newUrl,
});
files[i].filepath = newUrl;
} catch (error) {
logger.error(`Error refreshing S3 URL for file ${file.file_id}:`, error);
}
}
if (filesToUpdate.length > 0) {
await batchUpdateFiles(filesToUpdate);
}
return files;
}
/**
* Refreshes a single S3 URL if it's expired or close to expiring
*
* @param {{ filepath: string, source: string }} fileObj - Simple file object containing filepath and source
* @param {number} [bufferSeconds=3600] - Buffer time in seconds to check for expiration
* @returns {Promise<string>} The refreshed URL or the original URL if no refresh needed
*/
async function refreshS3Url(fileObj, bufferSeconds = 3600) {
if (!fileObj || fileObj.source !== FileSources.s3 || !fileObj.filepath) {
return fileObj?.filepath || '';
}
if (!needsRefresh(fileObj.filepath, bufferSeconds)) {
return fileObj.filepath;
}
try {
const s3Key = extractKeyFromS3Url(fileObj.filepath);
if (!s3Key) {
logger.warn(`Unable to extract S3 key from URL: ${fileObj.filepath}`);
return fileObj.filepath;
}
const keyParts = s3Key.split('/');
if (keyParts.length < 3) {
logger.warn(`Invalid S3 key format: ${s3Key}`);
return fileObj.filepath;
}
const basePath = keyParts[0];
const userId = keyParts[1];
const fileName = keyParts.slice(2).join('/');
const newUrl = await getS3URL({
userId,
fileName,
basePath,
});
logger.debug(`Refreshed S3 URL for key: ${s3Key}`);
return newUrl;
} catch (error) {
logger.error(`Error refreshing S3 URL: ${error.message}`);
return fileObj.filepath;
}
}
module.exports = {
saveBufferToS3,
saveURLToS3,
@ -181,4 +423,6 @@ module.exports = {
deleteFileFromS3,
uploadFileToS3,
getS3FileStream,
refreshS3FileUrls,
refreshS3Url,
};

View file

@ -7,6 +7,7 @@ const {
EModelEndpoint,
} = require('librechat-data-provider');
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
const { logAxiosError } = require('~/utils');
const { logger } = require('~/config');
/**
@ -24,8 +25,8 @@ async function fetchImageToBase64(url) {
});
return Buffer.from(response.data).toString('base64');
} catch (error) {
logger.error('Error fetching image to convert to base64', error);
throw error;
const message = 'Error fetching image to convert to base64';
throw new Error(logAxiosError({ message, error }));
}
}

View file

@ -3,11 +3,11 @@ import { useEffect, useCallback } from 'react';
import { isAssistantsEndpoint } from 'librechat-data-provider';
import type { TMessage } from 'librechat-data-provider';
import type { TMessageProps } from '~/common';
// eslint-disable-next-line import/no-cycle
import MessageContent from '~/components/Messages/MessageContent';
// eslint-disable-next-line import/no-cycle
import MessageParts from './MessageParts';
// eslint-disable-next-line import/no-cycle
import Message from './Message';
import store from '~/store';
@ -61,7 +61,6 @@ export default function MultiMessage({
/>
);
} else if (message.content) {
console.log('message.id with content', message.messageId);
return (
<MessageContent
key={message.messageId}

2
package-lock.json generated
View file

@ -43639,7 +43639,7 @@
},
"packages/data-provider": {
"name": "librechat-data-provider",
"version": "0.7.76",
"version": "0.7.77",
"license": "ISC",
"dependencies": {
"axios": "^1.8.2",

View file

@ -1,6 +1,6 @@
{
"name": "librechat-data-provider",
"version": "0.7.76",
"version": "0.7.77",
"description": "data services for librechat apps",
"main": "dist/index.js",
"module": "dist/index.es.js",

View file

@ -1006,6 +1006,10 @@ export enum CacheKeys {
* Key for in-progress flow states.
*/
FLOWS = 'flows',
/**
* Key for s3 check intervals per user
*/
S3_EXPIRY_INTERVAL = 'S3_EXPIRY_INTERVAL',
}
/**