mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-16 16:30:15 +01:00
🔀 refactor: Only Cleanup Meili Sync if actually Synced
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
This commit is contained in:
parent
31a283a4fe
commit
9c77f53454
1 changed files with 103 additions and 97 deletions
|
|
@ -183,95 +183,111 @@ async function ensureFilterableAttributes(client) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the actual sync operations for messages and conversations
|
* Performs the actual sync operations for messages and conversations
|
||||||
|
* @param {FlowStateManager} flowManager - Flow state manager instance
|
||||||
|
* @param {string} flowId - Flow identifier
|
||||||
|
* @param {string} flowType - Flow type
|
||||||
*/
|
*/
|
||||||
async function performSync() {
|
async function performSync(flowManager, flowId, flowType) {
|
||||||
const client = MeiliSearchClient.getInstance();
|
try {
|
||||||
|
const client = MeiliSearchClient.getInstance();
|
||||||
|
|
||||||
const { status } = await client.health();
|
const { status } = await client.health();
|
||||||
if (status !== 'available') {
|
if (status !== 'available') {
|
||||||
throw new Error('Meilisearch not available');
|
throw new Error('Meilisearch not available');
|
||||||
}
|
|
||||||
|
|
||||||
if (indexingDisabled === true) {
|
|
||||||
logger.info('[indexSync] Indexing is disabled, skipping...');
|
|
||||||
return { messagesSync: false, convosSync: false };
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Ensures indexes have proper filterable attributes configured */
|
|
||||||
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
|
|
||||||
await ensureFilterableAttributes(client);
|
|
||||||
|
|
||||||
let messagesSync = false;
|
|
||||||
let convosSync = false;
|
|
||||||
|
|
||||||
// Only reset flags if settings were actually updated (not just for orphaned doc cleanup)
|
|
||||||
if (settingsUpdated) {
|
|
||||||
logger.info(
|
|
||||||
'[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...',
|
|
||||||
);
|
|
||||||
|
|
||||||
// Reset sync flags to force full re-sync
|
|
||||||
await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
|
||||||
await Conversation.collection.updateMany(
|
|
||||||
{ _meiliIndex: true },
|
|
||||||
{ $set: { _meiliIndex: false } },
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we need to sync messages
|
|
||||||
const messageProgress = await Message.getSyncProgress();
|
|
||||||
if (!messageProgress.isComplete || settingsUpdated) {
|
|
||||||
logger.info(
|
|
||||||
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check if we should do a full sync or incremental
|
|
||||||
const messageCount = await Message.countDocuments();
|
|
||||||
const messagesIndexed = messageProgress.totalProcessed;
|
|
||||||
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
|
|
||||||
|
|
||||||
if (messageCount - messagesIndexed > syncThreshold) {
|
|
||||||
logger.info('[indexSync] Starting full message sync due to large difference');
|
|
||||||
await Message.syncWithMeili();
|
|
||||||
messagesSync = true;
|
|
||||||
} else if (messageCount !== messagesIndexed) {
|
|
||||||
logger.warn('[indexSync] Messages out of sync, performing incremental sync');
|
|
||||||
await Message.syncWithMeili();
|
|
||||||
messagesSync = true;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logger.info(
|
|
||||||
`[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we need to sync conversations
|
if (indexingDisabled === true) {
|
||||||
const convoProgress = await Conversation.getSyncProgress();
|
logger.info('[indexSync] Indexing is disabled, skipping...');
|
||||||
if (!convoProgress.isComplete || settingsUpdated) {
|
return { messagesSync: false, convosSync: false };
|
||||||
logger.info(
|
|
||||||
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
|
|
||||||
);
|
|
||||||
|
|
||||||
const convoCount = await Conversation.countDocuments();
|
|
||||||
const convosIndexed = convoProgress.totalProcessed;
|
|
||||||
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
|
|
||||||
|
|
||||||
if (convoCount - convosIndexed > syncThreshold) {
|
|
||||||
logger.info('[indexSync] Starting full conversation sync due to large difference');
|
|
||||||
await Conversation.syncWithMeili();
|
|
||||||
convosSync = true;
|
|
||||||
} else if (convoCount !== convosIndexed) {
|
|
||||||
logger.warn('[indexSync] Convos out of sync, performing incremental sync');
|
|
||||||
await Conversation.syncWithMeili();
|
|
||||||
convosSync = true;
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logger.info(
|
|
||||||
`[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { messagesSync, convosSync };
|
/** Ensures indexes have proper filterable attributes configured */
|
||||||
|
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
|
||||||
|
await ensureFilterableAttributes(client);
|
||||||
|
|
||||||
|
let messagesSync = false;
|
||||||
|
let convosSync = false;
|
||||||
|
|
||||||
|
// Only reset flags if settings were actually updated (not just for orphaned doc cleanup)
|
||||||
|
if (settingsUpdated) {
|
||||||
|
logger.info(
|
||||||
|
'[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...',
|
||||||
|
);
|
||||||
|
|
||||||
|
// Reset sync flags to force full re-sync
|
||||||
|
await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
||||||
|
await Conversation.collection.updateMany(
|
||||||
|
{ _meiliIndex: true },
|
||||||
|
{ $set: { _meiliIndex: false } },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we need to sync messages
|
||||||
|
const messageProgress = await Message.getSyncProgress();
|
||||||
|
if (!messageProgress.isComplete || settingsUpdated) {
|
||||||
|
logger.info(
|
||||||
|
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check if we should do a full sync or incremental
|
||||||
|
const messageCount = await Message.countDocuments();
|
||||||
|
const messagesIndexed = messageProgress.totalProcessed;
|
||||||
|
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
|
||||||
|
|
||||||
|
if (messageCount - messagesIndexed > syncThreshold) {
|
||||||
|
logger.info('[indexSync] Starting full message sync due to large difference');
|
||||||
|
await Message.syncWithMeili();
|
||||||
|
messagesSync = true;
|
||||||
|
} else if (messageCount !== messagesIndexed) {
|
||||||
|
logger.warn('[indexSync] Messages out of sync, performing incremental sync');
|
||||||
|
await Message.syncWithMeili();
|
||||||
|
messagesSync = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info(
|
||||||
|
`[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we need to sync conversations
|
||||||
|
const convoProgress = await Conversation.getSyncProgress();
|
||||||
|
if (!convoProgress.isComplete || settingsUpdated) {
|
||||||
|
logger.info(
|
||||||
|
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
|
||||||
|
);
|
||||||
|
|
||||||
|
const convoCount = await Conversation.countDocuments();
|
||||||
|
const convosIndexed = convoProgress.totalProcessed;
|
||||||
|
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
|
||||||
|
|
||||||
|
if (convoCount - convosIndexed > syncThreshold) {
|
||||||
|
logger.info('[indexSync] Starting full conversation sync due to large difference');
|
||||||
|
await Conversation.syncWithMeili();
|
||||||
|
convosSync = true;
|
||||||
|
} else if (convoCount !== convosIndexed) {
|
||||||
|
logger.warn('[indexSync] Convos out of sync, performing incremental sync');
|
||||||
|
await Conversation.syncWithMeili();
|
||||||
|
convosSync = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info(
|
||||||
|
`[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { messagesSync, convosSync };
|
||||||
|
} finally {
|
||||||
|
if (indexingDisabled === true) {
|
||||||
|
logger.info('[indexSync] Indexing is disabled, skipping cleanup...');
|
||||||
|
} else if (flowManager && flowId && flowType) {
|
||||||
|
try {
|
||||||
|
await flowManager.deleteFlow(flowId, flowType);
|
||||||
|
logger.debug('[indexSync] Flow state cleaned up');
|
||||||
|
} catch (cleanupErr) {
|
||||||
|
logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -288,7 +304,7 @@ async function indexSync() {
|
||||||
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
||||||
if (!flowsCache) {
|
if (!flowsCache) {
|
||||||
logger.warn('[indexSync] Flows cache not available, falling back to direct sync');
|
logger.warn('[indexSync] Flows cache not available, falling back to direct sync');
|
||||||
return await performSync();
|
return await performSync(null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
const flowManager = new FlowStateManager(flowsCache, {
|
const flowManager = new FlowStateManager(flowsCache, {
|
||||||
|
|
@ -301,7 +317,9 @@ async function indexSync() {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// This will only execute the handler if no other instance is running the sync
|
// This will only execute the handler if no other instance is running the sync
|
||||||
const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync);
|
const result = await flowManager.createFlowWithHandler(flowId, flowType, () =>
|
||||||
|
performSync(flowManager, flowId, flowType),
|
||||||
|
);
|
||||||
|
|
||||||
if (result.messagesSync || result.convosSync) {
|
if (result.messagesSync || result.convosSync) {
|
||||||
logger.info('[indexSync] Sync completed successfully');
|
logger.info('[indexSync] Sync completed successfully');
|
||||||
|
|
@ -331,18 +349,6 @@ async function indexSync() {
|
||||||
} else {
|
} else {
|
||||||
logger.error('[indexSync] error', err);
|
logger.error('[indexSync] error', err);
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
// Always clean up flow state after completion or error
|
|
||||||
// Skip cleanup only if flow already exists (another instance is handling it)
|
|
||||||
try {
|
|
||||||
const flowState = await flowManager.getFlowState(flowId, flowType);
|
|
||||||
if (flowState) {
|
|
||||||
await flowManager.deleteFlow(flowId, flowType);
|
|
||||||
logger.debug('[indexSync] Flow state cleaned up');
|
|
||||||
}
|
|
||||||
} catch (cleanupErr) {
|
|
||||||
logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue