LibreChat/api/db/indexSync.js

372 lines
13 KiB
JavaScript
Raw Normal View History

🏗️ refactor: Extract DB layers to `data-schemas` for shared use (#7650) * refactor: move model definitions and database-related methods to packages/data-schemas * ci: update tests due to new DB structure fix: disable mocking `librechat-data-provider` feat: Add schema exports to data-schemas package - Introduced a new schema module that exports various schemas including action, agent, and user schemas. - Updated index.ts to include the new schema exports for better modularity and organization. ci: fix appleStrategy tests fix: Agent.spec.js ci: refactor handleTools tests to use MongoMemoryServer for in-memory database fix: getLogStores imports ci: update banViolation tests to use MongoMemoryServer and improve session mocking test: refactor samlStrategy tests to improve mock configurations and user handling ci: fix crypto mock in handleText tests for improved accuracy ci: refactor spendTokens tests to improve model imports and setup ci: refactor Message model tests to use MongoMemoryServer and improve database interactions * refactor: streamline IMessage interface and move feedback properties to types/message.ts * refactor: use exported initializeRoles from `data-schemas`, remove api workspace version (this serves as an example of future migrations that still need to happen) * refactor: update model imports to use destructuring from `~/db/models` for consistency and clarity * refactor: remove unused mongoose imports from model files for cleaner code * refactor: remove unused mongoose imports from Share, Prompt, and Transaction model files for cleaner code * refactor: remove unused import in Transaction model for cleaner code * ci: update deploy workflow to reference new Docker Dev Branch Images Build and add new workflow for building Docker images on dev branch * chore: cleanup imports
2025-05-30 22:18:13 -04:00
const mongoose = require('mongoose');
const { MeiliSearch } = require('meilisearch');
🏗️ refactor: Extract DB layers to `data-schemas` for shared use (#7650) * refactor: move model definitions and database-related methods to packages/data-schemas * ci: update tests due to new DB structure fix: disable mocking `librechat-data-provider` feat: Add schema exports to data-schemas package - Introduced a new schema module that exports various schemas including action, agent, and user schemas. - Updated index.ts to include the new schema exports for better modularity and organization. ci: fix appleStrategy tests fix: Agent.spec.js ci: refactor handleTools tests to use MongoMemoryServer for in-memory database fix: getLogStores imports ci: update banViolation tests to use MongoMemoryServer and improve session mocking test: refactor samlStrategy tests to improve mock configurations and user handling ci: fix crypto mock in handleText tests for improved accuracy ci: refactor spendTokens tests to improve model imports and setup ci: refactor Message model tests to use MongoMemoryServer and improve database interactions * refactor: streamline IMessage interface and move feedback properties to types/message.ts * refactor: use exported initializeRoles from `data-schemas`, remove api workspace version (this serves as an example of future migrations that still need to happen) * refactor: update model imports to use destructuring from `~/db/models` for consistency and clarity * refactor: remove unused mongoose imports from model files for cleaner code * refactor: remove unused mongoose imports from Share, Prompt, and Transaction model files for cleaner code * refactor: remove unused import in Transaction model for cleaner code * ci: update deploy workflow to reference new Docker Dev Branch Images Build and add new workflow for building Docker images on dev branch * chore: cleanup imports
2025-05-30 22:18:13 -04:00
const { logger } = require('@librechat/data-schemas');
const { CacheKeys } = require('librechat-data-provider');
const { isEnabled, FlowStateManager } = require('@librechat/api');
const { getLogStores } = require('~/cache');
const { batchResetMeiliFlags } = require('./utils');
🏗️ refactor: Extract DB layers to `data-schemas` for shared use (#7650) * refactor: move model definitions and database-related methods to packages/data-schemas * ci: update tests due to new DB structure fix: disable mocking `librechat-data-provider` feat: Add schema exports to data-schemas package - Introduced a new schema module that exports various schemas including action, agent, and user schemas. - Updated index.ts to include the new schema exports for better modularity and organization. ci: fix appleStrategy tests fix: Agent.spec.js ci: refactor handleTools tests to use MongoMemoryServer for in-memory database fix: getLogStores imports ci: update banViolation tests to use MongoMemoryServer and improve session mocking test: refactor samlStrategy tests to improve mock configurations and user handling ci: fix crypto mock in handleText tests for improved accuracy ci: refactor spendTokens tests to improve model imports and setup ci: refactor Message model tests to use MongoMemoryServer and improve database interactions * refactor: streamline IMessage interface and move feedback properties to types/message.ts * refactor: use exported initializeRoles from `data-schemas`, remove api workspace version (this serves as an example of future migrations that still need to happen) * refactor: update model imports to use destructuring from `~/db/models` for consistency and clarity * refactor: remove unused mongoose imports from model files for cleaner code * refactor: remove unused mongoose imports from Share, Prompt, and Transaction model files for cleaner code * refactor: remove unused import in Transaction model for cleaner code * ci: update deploy workflow to reference new Docker Dev Branch Images Build and add new workflow for building Docker images on dev branch * chore: cleanup imports
2025-05-30 22:18:13 -04:00
const Conversation = mongoose.models.Conversation;
const Message = mongoose.models.Message;
const searchEnabled = isEnabled(process.env.SEARCH);
const indexingDisabled = isEnabled(process.env.MEILI_NO_SYNC);
let currentTimeout = null;
const defaultSyncThreshold = 1000;
const syncThreshold = process.env.MEILI_SYNC_THRESHOLD
? parseInt(process.env.MEILI_SYNC_THRESHOLD, 10)
: defaultSyncThreshold;
📥 feat: Import Conversations from LibreChat, ChatGPT, Chatbot UI (#2355) * Basic implementation of ChatGPT conversation import * remove debug code * Handle citations * Fix updatedAt in import * update default model * Use job scheduler to handle import requests * import job status endpoint * Add wrapper around Agenda * Rate limits for import endpoint * rename import api path * Batch save import to mongo * Improve naming * Add documenting comments * Test for importers * Change button for importing conversations * Frontend changes * Import job status endpoint * Import endpoint response * Add translations to new phrases * Fix conversations refreshing * cleanup unused functions * set timeout for import job status polling * Add documentation * get extra spaces back * Improve error message * Fix translation files after merge * fix translation files 2 * Add zh translation for import functionality * Sync mailisearch index after import * chore: add dummy uri for jest tests, as MONGO_URI should only be real for E2E tests * docs: fix links * docs: fix conversationsImport section * fix: user role issue for librechat imports * refactor: import conversations from json - organize imports - add additional jsdocs - use multer with diskStorage to avoid loading file into memory outside of job - use filepath instead of loading data string for imports - replace console logs and some logger.info() with logger.debug - only use multer for import route * fix: undefined metadata edge case and replace ChatGtp -> ChatGpt * Refactor importChatGptConvo function to handle undefined metadata edge case and replace ChatGtp with ChatGpt * fix: chatgpt importer * feat: maintain tree relationship for librechat messages * chore: use enum * refactor: saveMessage to use single object arg, replace console logs, add userId to log message * chore: additional comment * chore: multer edge case * feat: first pass, maintain tree relationship * chore: organize * chore: remove log * ci: add heirarchy test for chatgpt * ci: test maintaining of heirarchy for librechat * wip: allow non-text content type messages * refactor: import content part object json string * refactor: more content types to format * chore: consolidate messageText formatting * docs: update on changes, bump data-provider/config versions, update readme * refactor(indexSync): singleton pattern for MeiliSearchClient * refactor: debug log after batch is done * chore: add back indexSync error handling --------- Co-authored-by: jakubmieszczak <jakub.mieszczak@zendesk.com> Co-authored-by: Danny Avila <danny@librechat.ai>
2024-05-02 08:48:26 +02:00
class MeiliSearchClient {
static instance = null;
static getInstance() {
if (!MeiliSearchClient.instance) {
if (!process.env.MEILI_HOST || !process.env.MEILI_MASTER_KEY) {
throw new Error('Meilisearch configuration is missing.');
}
MeiliSearchClient.instance = new MeiliSearch({
host: process.env.MEILI_HOST,
apiKey: process.env.MEILI_MASTER_KEY,
});
}
return MeiliSearchClient.instance;
}
}
/**
* Deletes documents from MeiliSearch index that are missing the user field
* @param {import('meilisearch').Index} index - MeiliSearch index instance
* @param {string} indexName - Name of the index for logging
* @returns {Promise<number>} - Number of documents deleted
*/
async function deleteDocumentsWithoutUserField(index, indexName) {
let deletedCount = 0;
let offset = 0;
const batchSize = 1000;
try {
while (true) {
const searchResult = await index.search('', {
limit: batchSize,
offset: offset,
});
if (searchResult.hits.length === 0) {
break;
}
const idsToDelete = searchResult.hits.filter((hit) => !hit.user).map((hit) => hit.id);
if (idsToDelete.length > 0) {
logger.info(
`[indexSync] Deleting ${idsToDelete.length} documents without user field from ${indexName} index`,
);
await index.deleteDocuments(idsToDelete);
deletedCount += idsToDelete.length;
}
if (searchResult.hits.length < batchSize) {
break;
}
offset += batchSize;
}
if (deletedCount > 0) {
logger.info(`[indexSync] Deleted ${deletedCount} orphaned documents from ${indexName} index`);
}
} catch (error) {
logger.error(`[indexSync] Error deleting documents from ${indexName}:`, error);
}
return deletedCount;
}
/**
* Ensures indexes have proper filterable attributes configured and checks if documents have user field
* @param {MeiliSearch} client - MeiliSearch client instance
* @returns {Promise<{settingsUpdated: boolean, orphanedDocsFound: boolean}>} - Status of what was done
*/
async function ensureFilterableAttributes(client) {
let settingsUpdated = false;
let hasOrphanedDocs = false;
try {
// Check and update messages index
try {
const messagesIndex = client.index('messages');
const settings = await messagesIndex.getSettings();
if (!settings.filterableAttributes || !settings.filterableAttributes.includes('user')) {
logger.info('[indexSync] Configuring messages index to filter by user...');
await messagesIndex.updateSettings({
filterableAttributes: ['user'],
});
logger.info('[indexSync] Messages index configured for user filtering');
settingsUpdated = true;
}
// Check if existing documents have user field indexed
try {
const searchResult = await messagesIndex.search('', { limit: 1 });
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
logger.info(
'[indexSync] Existing messages missing user field, will clean up orphaned documents...',
);
hasOrphanedDocs = true;
}
} catch (searchError) {
logger.debug('[indexSync] Could not check message documents:', searchError.message);
}
} catch (error) {
if (error.code !== 'index_not_found') {
logger.warn('[indexSync] Could not check/update messages index settings:', error.message);
}
}
// Check and update conversations index
try {
const convosIndex = client.index('convos');
const settings = await convosIndex.getSettings();
if (!settings.filterableAttributes || !settings.filterableAttributes.includes('user')) {
logger.info('[indexSync] Configuring convos index to filter by user...');
await convosIndex.updateSettings({
filterableAttributes: ['user'],
});
logger.info('[indexSync] Convos index configured for user filtering');
settingsUpdated = true;
}
// Check if existing documents have user field indexed
try {
const searchResult = await convosIndex.search('', { limit: 1 });
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
logger.info(
'[indexSync] Existing conversations missing user field, will clean up orphaned documents...',
);
hasOrphanedDocs = true;
}
} catch (searchError) {
logger.debug('[indexSync] Could not check conversation documents:', searchError.message);
}
} catch (error) {
if (error.code !== 'index_not_found') {
logger.warn('[indexSync] Could not check/update convos index settings:', error.message);
}
}
// If either index has orphaned documents, clean them up (but don't force resync)
if (hasOrphanedDocs) {
try {
const messagesIndex = client.index('messages');
await deleteDocumentsWithoutUserField(messagesIndex, 'messages');
} catch (error) {
logger.debug('[indexSync] Could not clean up messages:', error.message);
}
try {
const convosIndex = client.index('convos');
await deleteDocumentsWithoutUserField(convosIndex, 'convos');
} catch (error) {
logger.debug('[indexSync] Could not clean up convos:', error.message);
}
logger.info('[indexSync] Orphaned documents cleaned up without forcing resync.');
}
if (settingsUpdated) {
logger.info('[indexSync] Index settings updated. Full re-sync will be triggered.');
}
} catch (error) {
logger.error('[indexSync] Error ensuring filterable attributes:', error);
}
return { settingsUpdated, orphanedDocsFound: hasOrphanedDocs };
}
/**
* Performs the actual sync operations for messages and conversations
* @param {FlowStateManager} flowManager - Flow state manager instance
* @param {string} flowId - Flow identifier
* @param {string} flowType - Flow type
*/
async function performSync(flowManager, flowId, flowType) {
try {
if (indexingDisabled === true) {
logger.info('[indexSync] Indexing is disabled, skipping...');
return { messagesSync: false, convosSync: false };
}
const client = MeiliSearchClient.getInstance();
const { status } = await client.health();
if (status !== 'available') {
throw new Error('Meilisearch not available');
}
/** Ensures indexes have proper filterable attributes configured */
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
await ensureFilterableAttributes(client);
let messagesSync = false;
let convosSync = false;
// Only reset flags if settings were actually updated (not just for orphaned doc cleanup)
if (settingsUpdated) {
logger.info(
'[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...',
);
// Reset sync flags to force full re-sync
await batchResetMeiliFlags(Message.collection);
await batchResetMeiliFlags(Conversation.collection);
}
// Check if we need to sync messages
logger.info('[indexSync] Requesting message sync progress...');
const messageProgress = await Message.getSyncProgress();
if (!messageProgress.isComplete || settingsUpdated) {
logger.info(
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
);
const messageCount = messageProgress.totalDocuments;
const messagesIndexed = messageProgress.totalProcessed;
const unindexedMessages = messageCount - messagesIndexed;
🫙 fix: Force MeiliSearch Full Sync on Empty Index State (#12202) * fix: meili index sync with unindexed documents - Updated `performSync` function to force a full sync when a fresh MeiliSearch index is detected, even if the number of unindexed messages or convos is below the sync threshold. - Added logging to indicate when a fresh index is detected and a full sync is initiated. - Introduced new tests to validate the behavior of the sync logic under various conditions, ensuring proper handling of fresh indexes and threshold scenarios. This change improves the reliability of the synchronization process, ensuring that all documents are indexed correctly when starting with a fresh index. * refactor: update sync logic for unindexed documents in MeiliSearch - Renamed variables in `performSync` to improve clarity, changing `freshIndex` to `noneIndexed` for better understanding of the sync condition. - Adjusted the logic to ensure a full sync is forced when no messages or conversations are marked as indexed, even if below the sync threshold. - Updated related tests to reflect the new logging messages and conditions, enhancing the accuracy of the sync threshold logic. This change improves the readability and reliability of the synchronization process, ensuring all documents are indexed correctly when starting with a fresh index. * fix: enhance MeiliSearch index creation error handling - Updated the `mongoMeili` function to improve logging and error handling during index creation in MeiliSearch. - Added handling for `MeiliSearchTimeOutError` to log a warning when index creation times out. - Enhanced logging to differentiate between successful index creation and specific failure reasons, including cases where the index already exists. - Improved debug logging for index creation tasks to provide clearer insights into the process. This change enhances the robustness of the index creation process and improves observability for troubleshooting. * fix: update MeiliSearch index creation error handling - Modified the `mongoMeili` function to check for any status other than 'succeeded' during index creation, enhancing error detection. - Improved logging to provide clearer insights when an index creation task fails, particularly for cases where the index already exists. This change strengthens the error handling mechanism for index creation in MeiliSearch, ensuring better observability and reliability.
2026-03-12 20:43:23 -04:00
const noneIndexed = messagesIndexed === 0 && unindexedMessages > 0;
🫙 fix: Force MeiliSearch Full Sync on Empty Index State (#12202) * fix: meili index sync with unindexed documents - Updated `performSync` function to force a full sync when a fresh MeiliSearch index is detected, even if the number of unindexed messages or convos is below the sync threshold. - Added logging to indicate when a fresh index is detected and a full sync is initiated. - Introduced new tests to validate the behavior of the sync logic under various conditions, ensuring proper handling of fresh indexes and threshold scenarios. This change improves the reliability of the synchronization process, ensuring that all documents are indexed correctly when starting with a fresh index. * refactor: update sync logic for unindexed documents in MeiliSearch - Renamed variables in `performSync` to improve clarity, changing `freshIndex` to `noneIndexed` for better understanding of the sync condition. - Adjusted the logic to ensure a full sync is forced when no messages or conversations are marked as indexed, even if below the sync threshold. - Updated related tests to reflect the new logging messages and conditions, enhancing the accuracy of the sync threshold logic. This change improves the readability and reliability of the synchronization process, ensuring all documents are indexed correctly when starting with a fresh index. * fix: enhance MeiliSearch index creation error handling - Updated the `mongoMeili` function to improve logging and error handling during index creation in MeiliSearch. - Added handling for `MeiliSearchTimeOutError` to log a warning when index creation times out. - Enhanced logging to differentiate between successful index creation and specific failure reasons, including cases where the index already exists. - Improved debug logging for index creation tasks to provide clearer insights into the process. This change enhances the robustness of the index creation process and improves observability for troubleshooting. * fix: update MeiliSearch index creation error handling - Modified the `mongoMeili` function to check for any status other than 'succeeded' during index creation, enhancing error detection. - Improved logging to provide clearer insights when an index creation task fails, particularly for cases where the index already exists. This change strengthens the error handling mechanism for index creation in MeiliSearch, ensuring better observability and reliability.
2026-03-12 20:43:23 -04:00
if (settingsUpdated || noneIndexed || unindexedMessages > syncThreshold) {
if (noneIndexed && !settingsUpdated) {
logger.info('[indexSync] No messages marked as indexed, forcing full sync');
}
logger.info(`[indexSync] Starting message sync (${unindexedMessages} unindexed)`);
await Message.syncWithMeili();
messagesSync = true;
} else if (unindexedMessages > 0) {
logger.info(
`[indexSync] ${unindexedMessages} messages unindexed (below threshold: ${syncThreshold}, skipping)`,
);
}
} else {
logger.info(
`[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`,
);
}
// Check if we need to sync conversations
const convoProgress = await Conversation.getSyncProgress();
if (!convoProgress.isComplete || settingsUpdated) {
logger.info(
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
);
const convoCount = convoProgress.totalDocuments;
const convosIndexed = convoProgress.totalProcessed;
const unindexedConvos = convoCount - convosIndexed;
🫙 fix: Force MeiliSearch Full Sync on Empty Index State (#12202) * fix: meili index sync with unindexed documents - Updated `performSync` function to force a full sync when a fresh MeiliSearch index is detected, even if the number of unindexed messages or convos is below the sync threshold. - Added logging to indicate when a fresh index is detected and a full sync is initiated. - Introduced new tests to validate the behavior of the sync logic under various conditions, ensuring proper handling of fresh indexes and threshold scenarios. This change improves the reliability of the synchronization process, ensuring that all documents are indexed correctly when starting with a fresh index. * refactor: update sync logic for unindexed documents in MeiliSearch - Renamed variables in `performSync` to improve clarity, changing `freshIndex` to `noneIndexed` for better understanding of the sync condition. - Adjusted the logic to ensure a full sync is forced when no messages or conversations are marked as indexed, even if below the sync threshold. - Updated related tests to reflect the new logging messages and conditions, enhancing the accuracy of the sync threshold logic. This change improves the readability and reliability of the synchronization process, ensuring all documents are indexed correctly when starting with a fresh index. * fix: enhance MeiliSearch index creation error handling - Updated the `mongoMeili` function to improve logging and error handling during index creation in MeiliSearch. - Added handling for `MeiliSearchTimeOutError` to log a warning when index creation times out. - Enhanced logging to differentiate between successful index creation and specific failure reasons, including cases where the index already exists. - Improved debug logging for index creation tasks to provide clearer insights into the process. This change enhances the robustness of the index creation process and improves observability for troubleshooting. * fix: update MeiliSearch index creation error handling - Modified the `mongoMeili` function to check for any status other than 'succeeded' during index creation, enhancing error detection. - Improved logging to provide clearer insights when an index creation task fails, particularly for cases where the index already exists. This change strengthens the error handling mechanism for index creation in MeiliSearch, ensuring better observability and reliability.
2026-03-12 20:43:23 -04:00
const noneConvosIndexed = convosIndexed === 0 && unindexedConvos > 0;
if (settingsUpdated || noneConvosIndexed || unindexedConvos > syncThreshold) {
if (noneConvosIndexed && !settingsUpdated) {
logger.info('[indexSync] No conversations marked as indexed, forcing full sync');
}
logger.info(`[indexSync] Starting convos sync (${unindexedConvos} unindexed)`);
await Conversation.syncWithMeili();
convosSync = true;
} else if (unindexedConvos > 0) {
logger.info(
`[indexSync] ${unindexedConvos} convos unindexed (below threshold: ${syncThreshold}, skipping)`,
);
}
} else {
logger.info(
`[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`,
);
}
return { messagesSync, convosSync };
} finally {
if (indexingDisabled === true) {
logger.info('[indexSync] Indexing is disabled, skipping cleanup...');
} else if (flowManager && flowId && flowType) {
try {
await flowManager.deleteFlow(flowId, flowType);
logger.debug('[indexSync] Flow state cleaned up');
} catch (cleanupErr) {
logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message);
}
}
}
}
/**
* Main index sync function that uses FlowStateManager to prevent concurrent execution
*/
async function indexSync() {
if (!searchEnabled) {
return;
}
logger.info('[indexSync] Starting index synchronization check...');
// Get or create FlowStateManager instance
const flowsCache = getLogStores(CacheKeys.FLOWS);
if (!flowsCache) {
logger.warn('[indexSync] Flows cache not available, falling back to direct sync');
return await performSync(null, null, null);
}
const flowManager = new FlowStateManager(flowsCache, {
ttl: 60000 * 10, // 10 minutes TTL for sync operations
});
// Use a unique flow ID for the sync operation
const flowId = 'meili-index-sync';
const flowType = 'MEILI_SYNC';
try {
// This will only execute the handler if no other instance is running the sync
const result = await flowManager.createFlowWithHandler(flowId, flowType, () =>
performSync(flowManager, flowId, flowType),
);
if (result.messagesSync || result.convosSync) {
logger.info('[indexSync] Sync completed successfully');
} else {
logger.debug('[indexSync] No sync was needed');
}
return result;
} catch (err) {
if (err.message.includes('flow already exists')) {
logger.info('[indexSync] Sync already running on another instance');
return;
}
2023-03-22 17:15:32 -04:00
if (err.message.includes('not found')) {
logger.debug('[indexSync] Creating indices...');
2023-03-23 13:16:07 -04:00
currentTimeout = setTimeout(async () => {
2023-03-22 17:15:32 -04:00
try {
await Message.syncWithMeili();
await Conversation.syncWithMeili();
} catch (err) {
logger.error('[indexSync] Trouble creating indices, try restarting the server.', err);
2023-03-22 17:15:32 -04:00
}
}, 750);
} else if (err.message.includes('Meilisearch not configured')) {
logger.info('[indexSync] Meilisearch not configured, search will be disabled.');
2023-03-22 17:15:32 -04:00
} else {
logger.error('[indexSync] error', err);
2023-03-22 17:15:32 -04:00
}
}
}
2023-03-23 13:16:07 -04:00
process.on('exit', () => {
logger.debug('[indexSync] Clearing sync timeouts before exiting...');
2023-03-23 13:16:07 -04:00
clearTimeout(currentTimeout);
});
module.exports = indexSync;