mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-23 02:36:12 +01:00
🔧 fix: Keyv and Proxy Issues, and More Memory Optimizations (#6867)
* chore: update @librechat/agents dependency to version 2.4.15
* refactor: Prevent memory leaks by nullifying boundModel.client in disposeClient function
* fix: use of proxy, use undici
* chore: update @librechat/agents dependency to version 2.4.16
* Revert "fix: use of proxy, use undici"
This reverts commit 83153cd582.
* fix: ensure fetch is imported for HTTP requests
* fix: replace direct OpenAI import with CustomOpenAIClient from @librechat/agents
* fix: update keyv peer dependency to version 5.3.2
* fix: update keyv dependency to version 5.3.2
* refactor: replace KeyvMongo with custom implementation and update flow state manager usage
* fix: update @librechat/agents dependency to version 2.4.17
* ci: update OpenAIClient tests to use CustomOpenAIClient from @librechat/agents
* refactor: remove KeyvMongo mock and related dependencies
This commit is contained in:
parent
339882eea4
commit
64bd373bc8
18 changed files with 375 additions and 743 deletions
|
|
@ -1,7 +1,6 @@
|
|||
const OpenAI = require('openai');
|
||||
const { OllamaClient } = require('./OllamaClient');
|
||||
const { HttpsProxyAgent } = require('https-proxy-agent');
|
||||
const { SplitStreamHandler } = require('@librechat/agents');
|
||||
const { SplitStreamHandler, CustomOpenAIClient: OpenAI } = require('@librechat/agents');
|
||||
const {
|
||||
Constants,
|
||||
ImageDetail,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
const fetch = require('node-fetch');
|
||||
const { GraphEvents } = require('@librechat/agents');
|
||||
const { logger, sendEvent } = require('~/config');
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ jest.mock('~/models', () => ({
|
|||
|
||||
const { getConvo, saveConvo } = require('~/models');
|
||||
|
||||
jest.mock('@langchain/openai', () => {
|
||||
jest.mock('@librechat/agents', () => {
|
||||
return {
|
||||
ChatOpenAI: jest.fn().mockImplementation(() => {
|
||||
return {};
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
jest.mock('~/cache/getLogStores');
|
||||
require('dotenv').config();
|
||||
const OpenAI = require('openai');
|
||||
const getLogStores = require('~/cache/getLogStores');
|
||||
const { fetchEventSource } = require('@waylaidwanderer/fetch-event-source');
|
||||
const { genAzureChatCompletion } = require('~/utils/azureUtils');
|
||||
const getLogStores = require('~/cache/getLogStores');
|
||||
const OpenAIClient = require('../OpenAIClient');
|
||||
jest.mock('meilisearch');
|
||||
|
||||
|
|
@ -36,19 +34,21 @@ jest.mock('~/models', () => ({
|
|||
updateFileUsage: jest.fn(),
|
||||
}));
|
||||
|
||||
jest.mock('@langchain/openai', () => {
|
||||
return {
|
||||
ChatOpenAI: jest.fn().mockImplementation(() => {
|
||||
return {};
|
||||
}),
|
||||
};
|
||||
// Import the actual module but mock specific parts
|
||||
const agents = jest.requireActual('@librechat/agents');
|
||||
const { CustomOpenAIClient } = agents;
|
||||
|
||||
// Also mock ChatOpenAI to prevent real API calls
|
||||
agents.ChatOpenAI = jest.fn().mockImplementation(() => {
|
||||
return {};
|
||||
});
|
||||
agents.AzureChatOpenAI = jest.fn().mockImplementation(() => {
|
||||
return {};
|
||||
});
|
||||
|
||||
jest.mock('openai');
|
||||
|
||||
jest.spyOn(OpenAI, 'constructor').mockImplementation(function (...options) {
|
||||
// We can add additional logic here if needed
|
||||
return new OpenAI(...options);
|
||||
// Mock only the CustomOpenAIClient constructor
|
||||
jest.spyOn(CustomOpenAIClient, 'constructor').mockImplementation(function (...options) {
|
||||
return new CustomOpenAIClient(...options);
|
||||
});
|
||||
|
||||
const finalChatCompletion = jest.fn().mockResolvedValue({
|
||||
|
|
@ -120,7 +120,13 @@ const create = jest.fn().mockResolvedValue({
|
|||
],
|
||||
});
|
||||
|
||||
OpenAI.mockImplementation(() => ({
|
||||
// Mock the implementation of CustomOpenAIClient instances
|
||||
jest.spyOn(CustomOpenAIClient.prototype, 'constructor').mockImplementation(function () {
|
||||
return this;
|
||||
});
|
||||
|
||||
// Create a mock for the CustomOpenAIClient class
|
||||
const mockCustomOpenAIClient = jest.fn().mockImplementation(() => ({
|
||||
beta: {
|
||||
chat: {
|
||||
completions: {
|
||||
|
|
@ -135,6 +141,8 @@ OpenAI.mockImplementation(() => ({
|
|||
},
|
||||
}));
|
||||
|
||||
CustomOpenAIClient.mockImplementation = mockCustomOpenAIClient;
|
||||
|
||||
describe('OpenAIClient', () => {
|
||||
beforeEach(() => {
|
||||
const mockCache = {
|
||||
|
|
@ -559,41 +567,6 @@ describe('OpenAIClient', () => {
|
|||
expect(requestBody).toHaveProperty('model');
|
||||
expect(requestBody.model).toBe(model);
|
||||
});
|
||||
|
||||
it('[Azure OpenAI] should call chatCompletion and OpenAI.stream with correct args', async () => {
|
||||
// Set a default model
|
||||
process.env.AZURE_OPENAI_DEFAULT_MODEL = 'gpt4-turbo';
|
||||
|
||||
const onProgress = jest.fn().mockImplementation(() => ({}));
|
||||
client.azure = defaultAzureOptions;
|
||||
const chatCompletion = jest.spyOn(client, 'chatCompletion');
|
||||
await client.sendMessage('Hi mom!', {
|
||||
replaceOptions: true,
|
||||
...defaultOptions,
|
||||
modelOptions: { model: 'gpt4-turbo', stream: true },
|
||||
onProgress,
|
||||
azure: defaultAzureOptions,
|
||||
});
|
||||
|
||||
expect(chatCompletion).toHaveBeenCalled();
|
||||
expect(chatCompletion.mock.calls.length).toBe(1);
|
||||
|
||||
const chatCompletionArgs = chatCompletion.mock.calls[0][0];
|
||||
const { payload } = chatCompletionArgs;
|
||||
|
||||
expect(payload[0].role).toBe('user');
|
||||
expect(payload[0].content).toBe('Hi mom!');
|
||||
|
||||
// Azure OpenAI does not use the model property, and will error if it's passed
|
||||
// This check ensures the model property is not present
|
||||
const streamArgs = stream.mock.calls[0][0];
|
||||
expect(streamArgs).not.toHaveProperty('model');
|
||||
|
||||
// Check if the baseURL is correct
|
||||
const constructorArgs = OpenAI.mock.calls[0][0];
|
||||
const expectedURL = genAzureChatCompletion(defaultAzureOptions).split('/chat')[0];
|
||||
expect(constructorArgs.baseURL).toBe(expectedURL);
|
||||
});
|
||||
});
|
||||
|
||||
describe('checkVisionRequest functionality', () => {
|
||||
|
|
|
|||
269
api/cache/keyvMongo.js
vendored
269
api/cache/keyvMongo.js
vendored
|
|
@ -1,9 +1,272 @@
|
|||
const { KeyvMongo } = require('@keyv/mongo');
|
||||
// api/cache/keyvMongo.js
|
||||
const mongoose = require('mongoose');
|
||||
const EventEmitter = require('events');
|
||||
const { GridFSBucket } = require('mongodb');
|
||||
const { logger } = require('~/config');
|
||||
|
||||
const { MONGO_URI } = process.env ?? {};
|
||||
const storeMap = new Map();
|
||||
|
||||
class KeyvMongoCustom extends EventEmitter {
|
||||
constructor(url, options = {}) {
|
||||
super();
|
||||
|
||||
url = url || {};
|
||||
if (typeof url === 'string') {
|
||||
url = { url };
|
||||
}
|
||||
if (url.uri) {
|
||||
url = { url: url.uri, ...url };
|
||||
}
|
||||
|
||||
this.opts = {
|
||||
url: 'mongodb://127.0.0.1:27017',
|
||||
collection: 'keyv',
|
||||
...url,
|
||||
...options,
|
||||
};
|
||||
|
||||
this.ttlSupport = false;
|
||||
|
||||
// Filter valid options
|
||||
const keyvMongoKeys = new Set([
|
||||
'url',
|
||||
'collection',
|
||||
'namespace',
|
||||
'serialize',
|
||||
'deserialize',
|
||||
'uri',
|
||||
'useGridFS',
|
||||
'dialect',
|
||||
]);
|
||||
this.opts = Object.fromEntries(Object.entries(this.opts).filter(([k]) => keyvMongoKeys.has(k)));
|
||||
}
|
||||
|
||||
// Helper to access the store WITHOUT storing a promise on the instance
|
||||
_getClient() {
|
||||
const storeKey = `${this.opts.collection}:${this.opts.useGridFS ? 'gridfs' : 'collection'}`;
|
||||
|
||||
// If we already have the store initialized, return it directly
|
||||
if (storeMap.has(storeKey)) {
|
||||
return Promise.resolve(storeMap.get(storeKey));
|
||||
}
|
||||
|
||||
// Check mongoose connection state
|
||||
if (mongoose.connection.readyState !== 1) {
|
||||
return Promise.reject(
|
||||
new Error('Mongoose connection not ready. Ensure connectDb() is called first.'),
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
const db = mongoose.connection.db;
|
||||
let client;
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
const bucket = new GridFSBucket(db, {
|
||||
readPreference: this.opts.readPreference,
|
||||
bucketName: this.opts.collection,
|
||||
});
|
||||
const store = db.collection(`${this.opts.collection}.files`);
|
||||
client = { bucket, store, db };
|
||||
} else {
|
||||
const collection = this.opts.collection || 'keyv';
|
||||
const store = db.collection(collection);
|
||||
client = { store, db };
|
||||
}
|
||||
|
||||
storeMap.set(storeKey, client);
|
||||
return Promise.resolve(client);
|
||||
} catch (error) {
|
||||
this.emit('error', error);
|
||||
return Promise.reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
async get(key) {
|
||||
const client = await this._getClient();
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
await client.store.updateOne(
|
||||
{
|
||||
filename: key,
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
'metadata.lastAccessed': new Date(),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
const stream = client.bucket.openDownloadStreamByName(key);
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const resp = [];
|
||||
stream.on('error', () => {
|
||||
resolve(undefined);
|
||||
});
|
||||
|
||||
stream.on('end', () => {
|
||||
const data = Buffer.concat(resp).toString('utf8');
|
||||
resolve(data);
|
||||
});
|
||||
|
||||
stream.on('data', (chunk) => {
|
||||
resp.push(chunk);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const document = await client.store.findOne({ key: { $eq: key } });
|
||||
|
||||
if (!document) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return document.value;
|
||||
}
|
||||
|
||||
async getMany(keys) {
|
||||
const client = await this._getClient();
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
const promises = [];
|
||||
for (const key of keys) {
|
||||
promises.push(this.get(key));
|
||||
}
|
||||
|
||||
const values = await Promise.allSettled(promises);
|
||||
const data = [];
|
||||
for (const value of values) {
|
||||
data.push(value.value);
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
const values = await client.store
|
||||
.find({ key: { $in: keys } })
|
||||
.project({ _id: 0, value: 1, key: 1 })
|
||||
.toArray();
|
||||
|
||||
const results = [...keys];
|
||||
let i = 0;
|
||||
for (const key of keys) {
|
||||
const rowIndex = values.findIndex((row) => row.key === key);
|
||||
results[i] = rowIndex > -1 ? values[rowIndex].value : undefined;
|
||||
i++;
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
async set(key, value, ttl) {
|
||||
const client = await this._getClient();
|
||||
const expiresAt = typeof ttl === 'number' ? new Date(Date.now() + ttl) : null;
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
const stream = client.bucket.openUploadStream(key, {
|
||||
metadata: {
|
||||
expiresAt,
|
||||
lastAccessed: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return new Promise((resolve) => {
|
||||
stream.on('finish', () => {
|
||||
resolve(stream);
|
||||
});
|
||||
stream.end(value);
|
||||
});
|
||||
}
|
||||
|
||||
await client.store.updateOne(
|
||||
{ key: { $eq: key } },
|
||||
{ $set: { key, value, expiresAt } },
|
||||
{ upsert: true },
|
||||
);
|
||||
}
|
||||
|
||||
async delete(key) {
|
||||
if (typeof key !== 'string') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const client = await this._getClient();
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
try {
|
||||
const bucket = new GridFSBucket(client.db, {
|
||||
bucketName: this.opts.collection,
|
||||
});
|
||||
const files = await bucket.find({ filename: key }).toArray();
|
||||
await client.bucket.delete(files[0]._id);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
const object = await client.store.deleteOne({ key: { $eq: key } });
|
||||
return object.deletedCount > 0;
|
||||
}
|
||||
|
||||
async deleteMany(keys) {
|
||||
const client = await this._getClient();
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
const bucket = new GridFSBucket(client.db, {
|
||||
bucketName: this.opts.collection,
|
||||
});
|
||||
const files = await bucket.find({ filename: { $in: keys } }).toArray();
|
||||
if (files.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
await Promise.all(files.map(async (file) => client.bucket.delete(file._id)));
|
||||
return true;
|
||||
}
|
||||
|
||||
const object = await client.store.deleteMany({ key: { $in: keys } });
|
||||
return object.deletedCount > 0;
|
||||
}
|
||||
|
||||
async clear() {
|
||||
const client = await this._getClient();
|
||||
|
||||
if (this.opts.useGridFS) {
|
||||
try {
|
||||
await client.bucket.drop();
|
||||
} catch (error) {
|
||||
// Throw error if not "namespace not found" error
|
||||
if (!(error.code === 26)) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await client.store.deleteMany({
|
||||
key: { $regex: this.namespace ? `^${this.namespace}:*` : '' },
|
||||
});
|
||||
}
|
||||
|
||||
async has(key) {
|
||||
const client = await this._getClient();
|
||||
const filter = { [this.opts.useGridFS ? 'filename' : 'key']: { $eq: key } };
|
||||
const document = await client.store.countDocuments(filter, { limit: 1 });
|
||||
return document !== 0;
|
||||
}
|
||||
|
||||
// No-op disconnect
|
||||
async disconnect() {
|
||||
// This is a no-op since we don't want to close the shared mongoose connection
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
const keyvMongo = new KeyvMongoCustom({
|
||||
collection: 'logs',
|
||||
});
|
||||
|
||||
const keyvMongo = new KeyvMongo(MONGO_URI, { collection: 'logs' });
|
||||
keyvMongo.on('error', (err) => logger.error('KeyvMongo connection error:', err));
|
||||
|
||||
module.exports = keyvMongo;
|
||||
|
|
|
|||
|
|
@ -24,12 +24,12 @@ function getMCPManager(userId) {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param {(key: string) => Keyv} getLogStores
|
||||
* @param {Keyv} flowsCache
|
||||
* @returns {FlowStateManager}
|
||||
*/
|
||||
function getFlowStateManager(getLogStores) {
|
||||
function getFlowStateManager(flowsCache) {
|
||||
if (!flowManager) {
|
||||
flowManager = new FlowStateManager(getLogStores(CacheKeys.FLOWS), {
|
||||
flowManager = new FlowStateManager(flowsCache, {
|
||||
ttl: Time.ONE_MINUTE * 3,
|
||||
logger,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ module.exports = {
|
|||
coverageDirectory: 'coverage',
|
||||
setupFiles: [
|
||||
'./test/jestSetup.js',
|
||||
'./test/__mocks__/KeyvMongo.js',
|
||||
'./test/__mocks__/logger.js',
|
||||
'./test/__mocks__/fetchEventSource.js',
|
||||
],
|
||||
|
|
|
|||
|
|
@ -42,14 +42,13 @@
|
|||
"@azure/storage-blob": "^12.26.0",
|
||||
"@google/generative-ai": "^0.23.0",
|
||||
"@googleapis/youtube": "^20.0.0",
|
||||
"@keyv/mongo": "^3.0.1",
|
||||
"@keyv/redis": "^4.3.3",
|
||||
"@langchain/community": "^0.3.39",
|
||||
"@langchain/core": "^0.3.43",
|
||||
"@langchain/google-genai": "^0.2.2",
|
||||
"@langchain/google-vertexai": "^0.2.3",
|
||||
"@langchain/textsplitters": "^0.1.0",
|
||||
"@librechat/agents": "^2.4.14",
|
||||
"@librechat/agents": "^2.4.17",
|
||||
"@librechat/data-schemas": "*",
|
||||
"@waylaidwanderer/fetch-event-source": "^3.0.1",
|
||||
"axios": "^1.8.2",
|
||||
|
|
|
|||
|
|
@ -238,6 +238,9 @@ function disposeClient(client) {
|
|||
client.run.Graph.streamBuffer = null;
|
||||
client.run.Graph.clientOptions = null;
|
||||
client.run.Graph.graphState = null;
|
||||
if (client.run.Graph.boundModel?.client) {
|
||||
client.run.Graph.boundModel.client = null;
|
||||
}
|
||||
client.run.Graph.boundModel = null;
|
||||
client.run.Graph.systemMessage = null;
|
||||
client.run.Graph.reasoningKey = null;
|
||||
|
|
|
|||
|
|
@ -787,6 +787,8 @@ class AgentClient extends BaseClient {
|
|||
[Callback.TOOL_ERROR]: logToolError,
|
||||
},
|
||||
});
|
||||
|
||||
config.signal = null;
|
||||
};
|
||||
|
||||
await runAgent(this.options.agent, initialMessages);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
const express = require('express');
|
||||
const jwt = require('jsonwebtoken');
|
||||
const { CacheKeys } = require('librechat-data-provider');
|
||||
const { getAccessToken } = require('~/server/services/TokenService');
|
||||
const { logger, getFlowStateManager } = require('~/config');
|
||||
const { getLogStores } = require('~/cache');
|
||||
|
|
@ -19,8 +20,8 @@ const JWT_SECRET = process.env.JWT_SECRET;
|
|||
router.get('/:action_id/oauth/callback', async (req, res) => {
|
||||
const { action_id } = req.params;
|
||||
const { code, state } = req.query;
|
||||
|
||||
const flowManager = getFlowStateManager(getLogStores);
|
||||
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
||||
const flowManager = getFlowStateManager(flowsCache);
|
||||
let identifier = action_id;
|
||||
try {
|
||||
let decodedState;
|
||||
|
|
|
|||
|
|
@ -74,7 +74,6 @@ async function domainParser(domain, inverse = false) {
|
|||
if (!domain) {
|
||||
return;
|
||||
}
|
||||
|
||||
const domainsCache = getLogStores(CacheKeys.ENCODED_DOMAINS);
|
||||
const cachedDomain = await domainsCache.get(domain);
|
||||
if (inverse && cachedDomain) {
|
||||
|
|
@ -188,7 +187,8 @@ async function createActionTool({
|
|||
expires_at: Date.now() + Time.TWO_MINUTES,
|
||||
},
|
||||
};
|
||||
const flowManager = getFlowStateManager(getLogStores);
|
||||
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
||||
const flowManager = getFlowStateManager(flowsCache);
|
||||
await flowManager.createFlowWithHandler(
|
||||
`${identifier}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`,
|
||||
'oauth_login',
|
||||
|
|
@ -264,7 +264,8 @@ async function createActionTool({
|
|||
encrypted_oauth_client_id: encrypted.oauth_client_id,
|
||||
encrypted_oauth_client_secret: encrypted.oauth_client_secret,
|
||||
});
|
||||
const flowManager = getFlowStateManager(getLogStores);
|
||||
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
||||
const flowManager = getFlowStateManager(flowsCache);
|
||||
const refreshData = await flowManager.createFlowWithHandler(
|
||||
`${identifier}:refresh`,
|
||||
'oauth_refresh',
|
||||
|
|
|
|||
|
|
@ -1,48 +0,0 @@
|
|||
jest.mock('@keyv/mongo', () => {
|
||||
const EventEmitter = require('events');
|
||||
class KeyvMongo extends EventEmitter {
|
||||
constructor(url = 'mongodb://127.0.0.1:27017', options) {
|
||||
super();
|
||||
this.ttlSupport = false;
|
||||
url = url ?? {};
|
||||
if (typeof url === 'string') {
|
||||
url = { url };
|
||||
}
|
||||
if (url.uri) {
|
||||
url = { url: url.uri, ...url };
|
||||
}
|
||||
this.opts = {
|
||||
url,
|
||||
collection: 'keyv',
|
||||
...url,
|
||||
...options,
|
||||
};
|
||||
|
||||
// In-memory store for tests
|
||||
this.store = new Map();
|
||||
}
|
||||
|
||||
async get(key) {
|
||||
return this.store.get(key);
|
||||
}
|
||||
|
||||
async set(key, value, ttl) {
|
||||
this.store.set(key, value);
|
||||
return true;
|
||||
}
|
||||
|
||||
async delete(key) {
|
||||
return this.store.delete(key);
|
||||
}
|
||||
|
||||
async clear() {
|
||||
this.store.clear();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Create a store factory function for the test suite
|
||||
const store = () => new KeyvMongo();
|
||||
|
||||
return { KeyvMongo };
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue