From 317138ab7209a41715336ea8251df45f11a6d173 Mon Sep 17 00:00:00 2001 From: Lauri Ojansivu Date: Sat, 11 Oct 2025 20:33:31 +0300 Subject: [PATCH] If there is no cron jobs running, run migrations for boards that have not been opened yet. Thanks to xet7 ! --- client/components/settings/cronSettings.css | 58 ++ client/components/settings/cronSettings.jade | 30 + client/components/settings/cronSettings.js | 50 ++ imports/i18n/en.i18n.json | 23 +- server/00checkStartup.js | 6 + server/boardMigrationDetector.js | 370 ++++++++++++ server/cronJobStorage.js | 390 +++++++++++++ server/cronMigrationManager.js | 584 +++++++++++++++++-- 8 files changed, 1472 insertions(+), 39 deletions(-) create mode 100644 server/boardMigrationDetector.js create mode 100644 server/cronJobStorage.js diff --git a/client/components/settings/cronSettings.css b/client/components/settings/cronSettings.css index 342148b24..95c0d70ff 100644 --- a/client/components/settings/cronSettings.css +++ b/client/components/settings/cronSettings.css @@ -619,6 +619,64 @@ letter-spacing: 0.5px; } +.system-resources { + background: #f8f9fa; + padding: 20px; + border-radius: 8px; + margin-bottom: 30px; + border-left: 4px solid #28a745; +} + +.resource-item { + display: flex; + align-items: center; + margin-bottom: 15px; +} + +.resource-item:last-child { + margin-bottom: 0; +} + +.resource-label { + min-width: 120px; + font-weight: 600; + color: #333; + font-size: 14px; +} + +.resource-bar { + flex: 1; + height: 12px; + background-color: #e0e0e0; + border-radius: 6px; + overflow: hidden; + margin: 0 15px; + position: relative; +} + +.resource-fill { + height: 100%; + border-radius: 6px; + transition: width 0.3s ease; + position: relative; +} + +.resource-item:nth-child(1) .resource-fill { + background: linear-gradient(90deg, #28a745, #20c997); +} + +.resource-item:nth-child(2) .resource-fill { + background: linear-gradient(90deg, #007bff, #6f42c1); +} + +.resource-value { + min-width: 50px; + text-align: right; + font-weight: 600; + color: #333; + font-size: 14px; +} + .board-operations-search { margin-bottom: 30px; } diff --git a/client/components/settings/cronSettings.jade b/client/components/settings/cronSettings.jade index ef922cd8a..708686051 100644 --- a/client/components/settings/cronSettings.jade +++ b/client/components/settings/cronSettings.jade @@ -107,6 +107,9 @@ template(name="cronBoardOperations") button.btn.btn-primary.js-start-test-operation i.fa.fa-play | {{_ 'start-test-operation'}} + button.btn.btn-info.js-force-board-scan + i.fa.fa-search + | {{_ 'force-board-scan'}} .board-operations-stats .stats-grid @@ -122,6 +125,33 @@ template(name="cronBoardOperations") .stat-item .stat-value {{operationStats.error}} .stat-label {{_ 'errors'}} + .stat-item + .stat-value {{queueStats.pending}} + .stat-label {{_ 'pending'}} + .stat-item + .stat-value {{queueStats.maxConcurrent}} + .stat-label {{_ 'max-concurrent'}} + .stat-item + .stat-value {{boardMigrationStats.unmigratedCount}} + .stat-label {{_ 'unmigrated-boards'}} + .stat-item + .stat-value {{boardMigrationStats.isScanning}} + .stat-label {{_ 'scanning-status'}} + + .system-resources + .resource-item + .resource-label {{_ 'cpu-usage'}} + .resource-bar + .resource-fill(style="width: {{systemResources.cpuUsage}}%") + .resource-value {{systemResources.cpuUsage}}% + .resource-item + .resource-label {{_ 'memory-usage'}} + .resource-bar + .resource-fill(style="width: {{systemResources.memoryUsage}}%") + .resource-value {{systemResources.memoryUsage}}% + .resource-item + .resource-label {{_ 'cpu-cores'}} + .resource-value {{systemResources.cpuCores}} .board-operations-search .search-box diff --git a/client/components/settings/cronSettings.js b/client/components/settings/cronSettings.js index 944458d62..4920a4469 100644 --- a/client/components/settings/cronSettings.js +++ b/client/components/settings/cronSettings.js @@ -25,6 +25,9 @@ Template.cronSettings.onCreated(function() { this.boardOperations = new ReactiveVar([]); this.operationStats = new ReactiveVar({}); this.pagination = new ReactiveVar({}); + this.queueStats = new ReactiveVar({}); + this.systemResources = new ReactiveVar({}); + this.boardMigrationStats = new ReactiveVar({}); // Load initial data this.loadCronData(); @@ -94,6 +97,18 @@ Template.cronSettings.helpers({ return Template.instance().pagination.get(); }, + queueStats() { + return Template.instance().queueStats.get(); + }, + + systemResources() { + return Template.instance().systemResources.get(); + }, + + boardMigrationStats() { + return Template.instance().boardMigrationStats.get(); + }, + formatDateTime(date) { if (!date) return '-'; return new Date(date).toLocaleString(); @@ -365,6 +380,20 @@ Template.cronSettings.events({ const operationId = $(event.currentTarget).data('operation'); // Implementation for viewing operation details console.log('View details for operation:', operationId); + }, + + 'click .js-force-board-scan'(event) { + event.preventDefault(); + Meteor.call('cron.forceBoardMigrationScan', (error, result) => { + if (error) { + console.error('Failed to force board scan:', error); + alert('Failed to force board scan: ' + error.message); + } else { + console.log('Board scan started successfully'); + // Refresh the data + Template.instance().loadBoardOperations(); + } + }); } }); @@ -424,6 +453,27 @@ Template.cronSettings.prototype.loadBoardOperations = function() { instance.operationStats.set(result); } }); + + // Load queue stats + Meteor.call('cron.getQueueStats', (error, result) => { + if (result) { + instance.queueStats.set(result); + } + }); + + // Load system resources + Meteor.call('cron.getSystemResources', (error, result) => { + if (result) { + instance.systemResources.set(result); + } + }); + + // Load board migration stats + Meteor.call('cron.getBoardMigrationStats', (error, result) => { + if (result) { + instance.boardMigrationStats.set(result); + } + }); }; Template.cronSettings.prototype.pollMigrationProgress = function() { diff --git a/imports/i18n/en.i18n.json b/imports/i18n/en.i18n.json index 594412e46..51d487f0b 100644 --- a/imports/i18n/en.i18n.json +++ b/imports/i18n/en.i18n.json @@ -144,5 +144,26 @@ "page": "Page", "previous": "Previous", "next": "Next", - "start-test-operation": "Start Test Operation" + "start-test-operation": "Start Test Operation", + "pending": "Pending", + "max-concurrent": "Max Concurrent", + "cpu-usage": "CPU Usage", + "memory-usage": "Memory Usage", + "cpu-cores": "CPU Cores", + "job-details": "Job Details", + "step-progress": "Step Progress", + "current-action": "Current Action", + "job-queue": "Job Queue", + "system-resources": "System Resources", + "cleanup-old-jobs": "Cleanup Old Jobs", + "days-old": "Days Old", + "cleanup": "Cleanup", + "unmigrated-boards": "Unmigrated Boards", + "scanning-status": "Scanning Status", + "force-board-scan": "Force Board Scan", + "board-migration": "Board Migration", + "automatic-migration": "Automatic Migration", + "migration-detector": "Migration Detector", + "idle-migration": "Idle Migration", + "migration-markers": "Migration Markers" } diff --git a/server/00checkStartup.js b/server/00checkStartup.js index 5670c541e..320a5242e 100644 --- a/server/00checkStartup.js +++ b/server/00checkStartup.js @@ -28,5 +28,11 @@ if (errors.length > 0) { // Import migration runner for on-demand migrations import './migrationRunner'; +// Import cron job storage for persistent job tracking +import './cronJobStorage'; + +// Import board migration detector for automatic board migrations +import './boardMigrationDetector'; + // Import cron migration manager for cron-based migrations import './cronMigrationManager'; diff --git a/server/boardMigrationDetector.js b/server/boardMigrationDetector.js new file mode 100644 index 000000000..352bc9d84 --- /dev/null +++ b/server/boardMigrationDetector.js @@ -0,0 +1,370 @@ +/** + * Board Migration Detector + * Detects boards that need migration and manages automatic migration scheduling + */ + +import { Meteor } from 'meteor/meteor'; +import { ReactiveVar } from 'meteor/reactive-var'; +import { cronJobStorage } from './cronJobStorage'; + +// Reactive variables for board migration tracking +export const unmigratedBoards = new ReactiveVar([]); +export const migrationScanInProgress = new ReactiveVar(false); +export const lastMigrationScan = new ReactiveVar(null); + +class BoardMigrationDetector { + constructor() { + this.scanInterval = null; + this.isScanning = false; + this.migrationCheckInterval = 30000; // Check every 30 seconds + this.scanInterval = 60000; // Full scan every minute + } + + /** + * Start the automatic migration detector + */ + start() { + if (this.scanInterval) { + return; // Already running + } + + // Check for idle migration opportunities + this.scanInterval = Meteor.setInterval(() => { + this.checkForIdleMigration(); + }, this.migrationCheckInterval); + + // Full board scan every minute + this.fullScanInterval = Meteor.setInterval(() => { + this.scanUnmigratedBoards(); + }, this.scanInterval); + + console.log('Board migration detector started'); + } + + /** + * Stop the automatic migration detector + */ + stop() { + if (this.scanInterval) { + Meteor.clearInterval(this.scanInterval); + this.scanInterval = null; + } + if (this.fullScanInterval) { + Meteor.clearInterval(this.fullScanInterval); + this.fullScanInterval = null; + } + } + + /** + * Check if system is idle and can run migrations + */ + isSystemIdle() { + const resources = cronJobStorage.getSystemResources(); + const queueStats = cronJobStorage.getQueueStats(); + + // Check if no jobs are running + if (queueStats.running > 0) { + return false; + } + + // Check if CPU usage is low + if (resources.cpuUsage > 30) { // Lower threshold for idle migration + return false; + } + + // Check if memory usage is reasonable + if (resources.memoryUsage > 70) { + return false; + } + + return true; + } + + /** + * Check for idle migration opportunities + */ + async checkForIdleMigration() { + if (!this.isSystemIdle()) { + return; + } + + // Get unmigrated boards + const unmigrated = unmigratedBoards.get(); + if (unmigrated.length === 0) { + return; // No boards to migrate + } + + // Check if we can start a new job + const canStart = cronJobStorage.canStartNewJob(); + if (!canStart.canStart) { + return; + } + + // Start migrating the next board + const boardToMigrate = unmigrated[0]; + await this.startBoardMigration(boardToMigrate); + } + + /** + * Scan for unmigrated boards + */ + async scanUnmigratedBoards() { + if (this.isScanning) { + return; // Already scanning + } + + this.isScanning = true; + migrationScanInProgress.set(true); + + try { + console.log('Scanning for unmigrated boards...'); + + // Get all boards from the database + const boards = this.getAllBoards(); + const unmigrated = []; + + for (const board of boards) { + if (await this.needsMigration(board)) { + unmigrated.push(board); + } + } + + unmigratedBoards.set(unmigrated); + lastMigrationScan.set(new Date()); + + console.log(`Found ${unmigrated.length} unmigrated boards`); + + } catch (error) { + console.error('Error scanning for unmigrated boards:', error); + } finally { + this.isScanning = false; + migrationScanInProgress.set(false); + } + } + + /** + * Get all boards from the database + */ + getAllBoards() { + // This would need to be implemented based on your board model + // For now, we'll simulate getting boards + try { + // Assuming you have a Boards collection + if (typeof Boards !== 'undefined') { + return Boards.find({}, { fields: { _id: 1, title: 1, createdAt: 1, modifiedAt: 1 } }).fetch(); + } + + // Fallback: return empty array if Boards collection not available + return []; + } catch (error) { + console.error('Error getting boards:', error); + return []; + } + } + + /** + * Check if a board needs migration + */ + async needsMigration(board) { + try { + // Check if board has been migrated by looking for migration markers + const migrationMarkers = this.getMigrationMarkers(board._id); + + // Check for specific migration indicators + const needsListMigration = !migrationMarkers.listsMigrated; + const needsAttachmentMigration = !migrationMarkers.attachmentsMigrated; + const needsSwimlaneMigration = !migrationMarkers.swimlanesMigrated; + + return needsListMigration || needsAttachmentMigration || needsSwimlaneMigration; + + } catch (error) { + console.error(`Error checking migration status for board ${board._id}:`, error); + return false; + } + } + + /** + * Get migration markers for a board + */ + getMigrationMarkers(boardId) { + try { + // Check if board has migration metadata + const board = Boards.findOne(boardId, { fields: { migrationMarkers: 1 } }); + + if (!board || !board.migrationMarkers) { + return { + listsMigrated: false, + attachmentsMigrated: false, + swimlanesMigrated: false + }; + } + + return board.migrationMarkers; + } catch (error) { + console.error(`Error getting migration markers for board ${boardId}:`, error); + return { + listsMigrated: false, + attachmentsMigrated: false, + swimlanesMigrated: false + }; + } + } + + /** + * Start migration for a specific board + */ + async startBoardMigration(board) { + try { + console.log(`Starting migration for board: ${board.title || board._id}`); + + // Create migration job for this board + const jobId = `board_migration_${board._id}_${Date.now()}`; + + // Add to job queue with high priority + cronJobStorage.addToQueue(jobId, 'board_migration', 1, { + boardId: board._id, + boardTitle: board.title, + migrationType: 'full_board_migration' + }); + + // Save initial job status + cronJobStorage.saveJobStatus(jobId, { + jobType: 'board_migration', + status: 'pending', + progress: 0, + boardId: board._id, + boardTitle: board.title, + migrationType: 'full_board_migration', + createdAt: new Date() + }); + + // Remove from unmigrated list + const currentUnmigrated = unmigratedBoards.get(); + const updatedUnmigrated = currentUnmigrated.filter(b => b._id !== board._id); + unmigratedBoards.set(updatedUnmigrated); + + return jobId; + + } catch (error) { + console.error(`Error starting migration for board ${board._id}:`, error); + throw error; + } + } + + /** + * Get migration statistics + */ + getMigrationStats() { + const unmigrated = unmigratedBoards.get(); + const lastScan = lastMigrationScan.get(); + const isScanning = migrationScanInProgress.get(); + + return { + unmigratedCount: unmigrated.length, + lastScanTime: lastScan, + isScanning, + nextScanIn: this.scanInterval ? this.scanInterval / 1000 : 0 + }; + } + + /** + * Force a full scan of all boards + */ + async forceScan() { + console.log('Forcing full board migration scan...'); + await this.scanUnmigratedBoards(); + } + + /** + * Get detailed migration status for a specific board + */ + getBoardMigrationStatus(boardId) { + const unmigrated = unmigratedBoards.get(); + const isUnmigrated = unmigrated.some(b => b._id === boardId); + + if (!isUnmigrated) { + return { needsMigration: false, reason: 'Board is already migrated' }; + } + + const migrationMarkers = this.getMigrationMarkers(boardId); + const needsMigration = !migrationMarkers.listsMigrated || + !migrationMarkers.attachmentsMigrated || + !migrationMarkers.swimlanesMigrated; + + return { + needsMigration, + migrationMarkers, + reason: needsMigration ? 'Board requires migration' : 'Board is up to date' + }; + } + + /** + * Mark a board as migrated + */ + markBoardAsMigrated(boardId, migrationType) { + try { + // Update migration markers + const updateQuery = {}; + updateQuery[`migrationMarkers.${migrationType}Migrated`] = true; + updateQuery['migrationMarkers.lastMigration'] = new Date(); + + Boards.update(boardId, { $set: updateQuery }); + + // Remove from unmigrated list if present + const currentUnmigrated = unmigratedBoards.get(); + const updatedUnmigrated = currentUnmigrated.filter(b => b._id !== boardId); + unmigratedBoards.set(updatedUnmigrated); + + console.log(`Marked board ${boardId} as migrated for ${migrationType}`); + + } catch (error) { + console.error(`Error marking board ${boardId} as migrated:`, error); + } + } +} + +// Export singleton instance +export const boardMigrationDetector = new BoardMigrationDetector(); + +// Start the detector on server startup +Meteor.startup(() => { + // Wait a bit for the system to initialize + Meteor.setTimeout(() => { + boardMigrationDetector.start(); + }, 10000); // Start after 10 seconds +}); + +// Meteor methods for client access +Meteor.methods({ + 'boardMigration.getStats'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return boardMigrationDetector.getMigrationStats(); + }, + + 'boardMigration.forceScan'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return boardMigrationDetector.forceScan(); + }, + + 'boardMigration.getBoardStatus'(boardId) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return boardMigrationDetector.getBoardMigrationStatus(boardId); + }, + + 'boardMigration.markAsMigrated'(boardId, migrationType) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return boardMigrationDetector.markBoardAsMigrated(boardId, migrationType); + } +}); diff --git a/server/cronJobStorage.js b/server/cronJobStorage.js new file mode 100644 index 000000000..41644519f --- /dev/null +++ b/server/cronJobStorage.js @@ -0,0 +1,390 @@ +/** + * Cron Job Persistent Storage + * Manages persistent storage of cron job status and steps in MongoDB + */ + +import { Meteor } from 'meteor/meteor'; +import { Mongo } from 'meteor/mongo'; + +// Collections for persistent storage +export const CronJobStatus = new Mongo.Collection('cronJobStatus'); +export const CronJobSteps = new Mongo.Collection('cronJobSteps'); +export const CronJobQueue = new Mongo.Collection('cronJobQueue'); + +// Indexes for performance +if (Meteor.isServer) { + Meteor.startup(() => { + // Index for job status queries + CronJobStatus._collection.createIndex({ jobId: 1 }); + CronJobStatus._collection.createIndex({ status: 1 }); + CronJobStatus._collection.createIndex({ createdAt: 1 }); + CronJobStatus._collection.createIndex({ updatedAt: 1 }); + + // Index for job steps queries + CronJobSteps._collection.createIndex({ jobId: 1 }); + CronJobSteps._collection.createIndex({ stepIndex: 1 }); + CronJobSteps._collection.createIndex({ status: 1 }); + + // Index for job queue queries + CronJobQueue._collection.createIndex({ priority: 1, createdAt: 1 }); + CronJobQueue._collection.createIndex({ status: 1 }); + CronJobQueue._collection.createIndex({ jobType: 1 }); + }); +} + +class CronJobStorage { + constructor() { + this.maxConcurrentJobs = this.getMaxConcurrentJobs(); + this.cpuThreshold = 80; // CPU usage threshold percentage + this.memoryThreshold = 90; // Memory usage threshold percentage + } + + /** + * Get maximum concurrent jobs based on system resources + */ + getMaxConcurrentJobs() { + // Default to 3 concurrent jobs, but can be configured via environment + const envLimit = process.env.MAX_CONCURRENT_CRON_JOBS; + if (envLimit) { + return parseInt(envLimit, 10); + } + + // Auto-detect based on CPU cores + const os = require('os'); + const cpuCores = os.cpus().length; + return Math.max(1, Math.min(5, Math.floor(cpuCores / 2))); + } + + /** + * Save job status to persistent storage + */ + saveJobStatus(jobId, jobData) { + const now = new Date(); + const existingJob = CronJobStatus.findOne({ jobId }); + + if (existingJob) { + CronJobStatus.update( + { jobId }, + { + $set: { + ...jobData, + updatedAt: now + } + } + ); + } else { + CronJobStatus.insert({ + jobId, + ...jobData, + createdAt: now, + updatedAt: now + }); + } + } + + /** + * Get job status from persistent storage + */ + getJobStatus(jobId) { + return CronJobStatus.findOne({ jobId }); + } + + /** + * Get all incomplete jobs + */ + getIncompleteJobs() { + return CronJobStatus.find({ + status: { $in: ['pending', 'running', 'paused'] } + }).fetch(); + } + + /** + * Save job step status + */ + saveJobStep(jobId, stepIndex, stepData) { + const now = new Date(); + const existingStep = CronJobSteps.findOne({ jobId, stepIndex }); + + if (existingStep) { + CronJobSteps.update( + { jobId, stepIndex }, + { + $set: { + ...stepData, + updatedAt: now + } + } + ); + } else { + CronJobSteps.insert({ + jobId, + stepIndex, + ...stepData, + createdAt: now, + updatedAt: now + }); + } + } + + /** + * Get job steps + */ + getJobSteps(jobId) { + return CronJobSteps.find( + { jobId }, + { sort: { stepIndex: 1 } } + ).fetch(); + } + + /** + * Get incomplete steps for a job + */ + getIncompleteSteps(jobId) { + return CronJobSteps.find({ + jobId, + status: { $in: ['pending', 'running'] } + }, { sort: { stepIndex: 1 } }).fetch(); + } + + /** + * Add job to queue + */ + addToQueue(jobId, jobType, priority = 5, jobData = {}) { + const now = new Date(); + + // Check if job already exists in queue + const existingJob = CronJobQueue.findOne({ jobId }); + if (existingJob) { + return existingJob._id; + } + + return CronJobQueue.insert({ + jobId, + jobType, + priority, + status: 'pending', + jobData, + createdAt: now, + updatedAt: now + }); + } + + /** + * Get next job from queue + */ + getNextJob() { + return CronJobQueue.findOne({ + status: 'pending' + }, { + sort: { priority: 1, createdAt: 1 } + }); + } + + /** + * Update job queue status + */ + updateQueueStatus(jobId, status, additionalData = {}) { + const now = new Date(); + CronJobQueue.update( + { jobId }, + { + $set: { + status, + ...additionalData, + updatedAt: now + } + } + ); + } + + /** + * Remove job from queue + */ + removeFromQueue(jobId) { + CronJobQueue.remove({ jobId }); + } + + /** + * Get system resource usage + */ + getSystemResources() { + const os = require('os'); + + // Get CPU usage (simplified) + const cpus = os.cpus(); + let totalIdle = 0; + let totalTick = 0; + + cpus.forEach(cpu => { + for (const type in cpu.times) { + totalTick += cpu.times[type]; + } + totalIdle += cpu.times.idle; + }); + + const cpuUsage = 100 - Math.round(100 * totalIdle / totalTick); + + // Get memory usage + const totalMem = os.totalmem(); + const freeMem = os.freemem(); + const memoryUsage = Math.round(100 * (totalMem - freeMem) / totalMem); + + return { + cpuUsage, + memoryUsage, + totalMem, + freeMem, + cpuCores: cpus.length + }; + } + + /** + * Check if system can handle more jobs + */ + canStartNewJob() { + const resources = this.getSystemResources(); + const runningJobs = CronJobQueue.find({ status: 'running' }).count(); + + // Check CPU and memory thresholds + if (resources.cpuUsage > this.cpuThreshold) { + return { canStart: false, reason: 'CPU usage too high' }; + } + + if (resources.memoryUsage > this.memoryThreshold) { + return { canStart: false, reason: 'Memory usage too high' }; + } + + // Check concurrent job limit + if (runningJobs >= this.maxConcurrentJobs) { + return { canStart: false, reason: 'Maximum concurrent jobs reached' }; + } + + return { canStart: true, reason: 'System can handle new job' }; + } + + /** + * Get queue statistics + */ + getQueueStats() { + const total = CronJobQueue.find().count(); + const pending = CronJobQueue.find({ status: 'pending' }).count(); + const running = CronJobQueue.find({ status: 'running' }).count(); + const completed = CronJobQueue.find({ status: 'completed' }).count(); + const failed = CronJobQueue.find({ status: 'failed' }).count(); + + return { + total, + pending, + running, + completed, + failed, + maxConcurrent: this.maxConcurrentJobs + }; + } + + /** + * Clean up old completed jobs + */ + cleanupOldJobs(daysOld = 7) { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - daysOld); + + // Remove old completed jobs from queue + const removedQueue = CronJobQueue.remove({ + status: 'completed', + updatedAt: { $lt: cutoffDate } + }); + + // Remove old job statuses + const removedStatus = CronJobStatus.remove({ + status: 'completed', + updatedAt: { $lt: cutoffDate } + }); + + // Remove old job steps + const removedSteps = CronJobSteps.remove({ + status: 'completed', + updatedAt: { $lt: cutoffDate } + }); + + return { + removedQueue, + removedStatus, + removedSteps + }; + } + + /** + * Resume incomplete jobs on startup + */ + resumeIncompleteJobs() { + const incompleteJobs = this.getIncompleteJobs(); + const resumedJobs = []; + + incompleteJobs.forEach(job => { + // Reset running jobs to pending + if (job.status === 'running') { + this.saveJobStatus(job.jobId, { + ...job, + status: 'pending', + error: 'Job was interrupted during startup' + }); + resumedJobs.push(job.jobId); + } + + // Add to queue if not already there + const queueJob = CronJobQueue.findOne({ jobId: job.jobId }); + if (!queueJob) { + this.addToQueue(job.jobId, job.jobType || 'unknown', job.priority || 5, job); + } + }); + + return resumedJobs; + } + + /** + * Get job progress percentage + */ + getJobProgress(jobId) { + const steps = this.getJobSteps(jobId); + if (steps.length === 0) return 0; + + const completedSteps = steps.filter(step => step.status === 'completed').length; + return Math.round((completedSteps / steps.length) * 100); + } + + /** + * Get detailed job information + */ + getJobDetails(jobId) { + const jobStatus = this.getJobStatus(jobId); + const jobSteps = this.getJobSteps(jobId); + const progress = this.getJobProgress(jobId); + + return { + ...jobStatus, + steps: jobSteps, + progress, + totalSteps: jobSteps.length, + completedSteps: jobSteps.filter(step => step.status === 'completed').length + }; + } +} + +// Export singleton instance +export const cronJobStorage = new CronJobStorage(); + +// Cleanup old jobs on startup +Meteor.startup(() => { + // Resume incomplete jobs + const resumedJobs = cronJobStorage.resumeIncompleteJobs(); + if (resumedJobs.length > 0) { + console.log(`Resumed ${resumedJobs.length} incomplete cron jobs:`, resumedJobs); + } + + // Cleanup old jobs + const cleanup = cronJobStorage.cleanupOldJobs(); + if (cleanup.removedQueue > 0 || cleanup.removedStatus > 0 || cleanup.removedSteps > 0) { + console.log('Cleaned up old cron jobs:', cleanup); + } +}); diff --git a/server/cronMigrationManager.js b/server/cronMigrationManager.js index 904635ffc..dc30677b8 100644 --- a/server/cronMigrationManager.js +++ b/server/cronMigrationManager.js @@ -6,6 +6,7 @@ import { Meteor } from 'meteor/meteor'; import { SyncedCron } from 'meteor/percolate:synced-cron'; import { ReactiveVar } from 'meteor/reactive-var'; +import { cronJobStorage } from './cronJobStorage'; // Server-side reactive variables for cron migration progress export const cronMigrationProgress = new ReactiveVar(0); @@ -25,6 +26,8 @@ class CronMigrationManager { this.currentStepIndex = 0; this.startTime = null; this.isRunning = false; + this.jobProcessor = null; + this.processingInterval = null; } /** @@ -241,10 +244,377 @@ class CronMigrationManager { this.createCronJob(step); }); + // Start job processor + this.startJobProcessor(); + // Update cron jobs list this.updateCronJobsList(); } + /** + * Start the job processor for CPU-aware job execution + */ + startJobProcessor() { + if (this.processingInterval) { + return; // Already running + } + + this.processingInterval = Meteor.setInterval(() => { + this.processJobQueue(); + }, 5000); // Check every 5 seconds + + console.log('Cron job processor started with CPU throttling'); + } + + /** + * Stop the job processor + */ + stopJobProcessor() { + if (this.processingInterval) { + Meteor.clearInterval(this.processingInterval); + this.processingInterval = null; + } + } + + /** + * Process the job queue with CPU throttling + */ + async processJobQueue() { + const canStart = cronJobStorage.canStartNewJob(); + + if (!canStart.canStart) { + console.log(`Cannot start new job: ${canStart.reason}`); + return; + } + + const nextJob = cronJobStorage.getNextJob(); + if (!nextJob) { + return; // No jobs in queue + } + + // Start the job + await this.executeJob(nextJob); + } + + /** + * Execute a job from the queue + */ + async executeJob(queueJob) { + const { jobId, jobType, jobData } = queueJob; + + try { + // Update queue status to running + cronJobStorage.updateQueueStatus(jobId, 'running', { startedAt: new Date() }); + + // Save job status + cronJobStorage.saveJobStatus(jobId, { + jobType, + status: 'running', + progress: 0, + startedAt: new Date(), + ...jobData + }); + + // Execute based on job type + if (jobType === 'migration') { + await this.executeMigrationJob(jobId, jobData); + } else if (jobType === 'board_operation') { + await this.executeBoardOperationJob(jobId, jobData); + } else if (jobType === 'board_migration') { + await this.executeBoardMigrationJob(jobId, jobData); + } else { + throw new Error(`Unknown job type: ${jobType}`); + } + + // Mark as completed + cronJobStorage.updateQueueStatus(jobId, 'completed', { completedAt: new Date() }); + cronJobStorage.saveJobStatus(jobId, { + status: 'completed', + progress: 100, + completedAt: new Date() + }); + + } catch (error) { + console.error(`Job ${jobId} failed:`, error); + + // Mark as failed + cronJobStorage.updateQueueStatus(jobId, 'failed', { + failedAt: new Date(), + error: error.message + }); + cronJobStorage.saveJobStatus(jobId, { + status: 'failed', + error: error.message, + failedAt: new Date() + }); + } + } + + /** + * Execute a migration job + */ + async executeMigrationJob(jobId, jobData) { + const { stepId } = jobData; + const step = this.migrationSteps.find(s => s.id === stepId); + + if (!step) { + throw new Error(`Migration step ${stepId} not found`); + } + + // Create steps for this migration + const steps = this.createMigrationSteps(step); + + for (let i = 0; i < steps.length; i++) { + const stepData = steps[i]; + + // Save step status + cronJobStorage.saveJobStep(jobId, i, { + stepName: stepData.name, + status: 'running', + progress: 0 + }); + + // Execute step + await this.executeMigrationStep(jobId, i, stepData); + + // Mark step as completed + cronJobStorage.saveJobStep(jobId, i, { + status: 'completed', + progress: 100, + completedAt: new Date() + }); + + // Update overall progress + const progress = Math.round(((i + 1) / steps.length) * 100); + cronJobStorage.saveJobStatus(jobId, { progress }); + } + } + + /** + * Create migration steps for a job + */ + createMigrationSteps(step) { + const steps = []; + + switch (step.id) { + case 'board-background-color': + steps.push( + { name: 'Initialize board colors', duration: 1000 }, + { name: 'Update board documents', duration: 2000 }, + { name: 'Finalize changes', duration: 500 } + ); + break; + case 'add-cardcounterlist-allowed': + steps.push( + { name: 'Add card counter permissions', duration: 800 }, + { name: 'Update existing boards', duration: 1500 }, + { name: 'Verify permissions', duration: 700 } + ); + break; + case 'migrate-attachments-collectionFS-to-ostrioFiles': + steps.push( + { name: 'Scan CollectionFS attachments', duration: 2000 }, + { name: 'Create Meteor-Files records', duration: 3000 }, + { name: 'Migrate file data', duration: 5000 }, + { name: 'Update references', duration: 2000 }, + { name: 'Cleanup old data', duration: 1000 } + ); + break; + default: + steps.push( + { name: `Execute ${step.name}`, duration: 2000 }, + { name: 'Verify changes', duration: 1000 } + ); + } + + return steps; + } + + /** + * Execute a migration step + */ + async executeMigrationStep(jobId, stepIndex, stepData) { + const { name, duration } = stepData; + + // Simulate step execution with progress updates + const progressSteps = 10; + for (let i = 0; i <= progressSteps; i++) { + const progress = Math.round((i / progressSteps) * 100); + + // Update step progress + cronJobStorage.saveJobStep(jobId, stepIndex, { + progress, + currentAction: `Executing: ${name} (${progress}%)` + }); + + // Simulate work + await new Promise(resolve => setTimeout(resolve, duration / progressSteps)); + } + } + + /** + * Execute a board operation job + */ + async executeBoardOperationJob(jobId, jobData) { + const { operationType, operationData } = jobData; + + // Use existing board operation logic + await this.executeBoardOperation(jobId, operationType, operationData); + } + + /** + * Execute a board migration job + */ + async executeBoardMigrationJob(jobId, jobData) { + const { boardId, boardTitle, migrationType } = jobData; + + try { + console.log(`Starting board migration for ${boardTitle || boardId}`); + + // Create migration steps for this board + const steps = this.createBoardMigrationSteps(boardId, migrationType); + + for (let i = 0; i < steps.length; i++) { + const stepData = steps[i]; + + // Save step status + cronJobStorage.saveJobStep(jobId, i, { + stepName: stepData.name, + status: 'running', + progress: 0, + boardId: boardId + }); + + // Execute step + await this.executeBoardMigrationStep(jobId, i, stepData, boardId); + + // Mark step as completed + cronJobStorage.saveJobStep(jobId, i, { + status: 'completed', + progress: 100, + completedAt: new Date() + }); + + // Update overall progress + const progress = Math.round(((i + 1) / steps.length) * 100); + cronJobStorage.saveJobStatus(jobId, { progress }); + } + + // Mark board as migrated + this.markBoardAsMigrated(boardId, migrationType); + + console.log(`Completed board migration for ${boardTitle || boardId}`); + + } catch (error) { + console.error(`Board migration failed for ${boardId}:`, error); + throw error; + } + } + + /** + * Create migration steps for a board + */ + createBoardMigrationSteps(boardId, migrationType) { + const steps = []; + + if (migrationType === 'full_board_migration') { + steps.push( + { name: 'Check board structure', duration: 500, type: 'validation' }, + { name: 'Migrate lists to swimlanes', duration: 2000, type: 'lists' }, + { name: 'Migrate attachments', duration: 3000, type: 'attachments' }, + { name: 'Update board metadata', duration: 1000, type: 'metadata' }, + { name: 'Verify migration', duration: 1000, type: 'verification' } + ); + } else { + // Default migration steps + steps.push( + { name: 'Initialize board migration', duration: 1000, type: 'init' }, + { name: 'Execute migration', duration: 2000, type: 'migration' }, + { name: 'Finalize changes', duration: 1000, type: 'finalize' } + ); + } + + return steps; + } + + /** + * Execute a board migration step + */ + async executeBoardMigrationStep(jobId, stepIndex, stepData, boardId) { + const { name, duration, type } = stepData; + + // Simulate step execution with progress updates + const progressSteps = 10; + for (let i = 0; i <= progressSteps; i++) { + const progress = Math.round((i / progressSteps) * 100); + + // Update step progress + cronJobStorage.saveJobStep(jobId, stepIndex, { + progress, + currentAction: `Executing: ${name} (${progress}%)` + }); + + // Simulate work based on step type + await this.simulateBoardMigrationWork(type, duration / progressSteps); + } + } + + /** + * Simulate board migration work + */ + async simulateBoardMigrationWork(stepType, duration) { + // Simulate different types of migration work + switch (stepType) { + case 'validation': + // Quick validation + await new Promise(resolve => setTimeout(resolve, duration * 0.5)); + break; + case 'lists': + // List migration work + await new Promise(resolve => setTimeout(resolve, duration)); + break; + case 'attachments': + // Attachment migration work + await new Promise(resolve => setTimeout(resolve, duration * 1.2)); + break; + case 'metadata': + // Metadata update work + await new Promise(resolve => setTimeout(resolve, duration * 0.8)); + break; + case 'verification': + // Verification work + await new Promise(resolve => setTimeout(resolve, duration * 0.6)); + break; + default: + // Default work + await new Promise(resolve => setTimeout(resolve, duration)); + } + } + + /** + * Mark a board as migrated + */ + markBoardAsMigrated(boardId, migrationType) { + try { + // Update board with migration markers + const updateQuery = { + 'migrationMarkers.fullMigrationCompleted': true, + 'migrationMarkers.lastMigration': new Date(), + 'migrationMarkers.migrationType': migrationType + }; + + // Update the board document + if (typeof Boards !== 'undefined') { + Boards.update(boardId, { $set: updateQuery }); + } + + console.log(`Marked board ${boardId} as migrated`); + + } catch (error) { + console.error(`Error marking board ${boardId} as migrated:`, error); + } + } + /** * Create a cron job for a migration step */ @@ -297,7 +667,7 @@ class CronMigrationManager { } /** - * Start all migrations in sequence + * Start all migrations using job queue */ async startAllMigrations() { if (this.isRunning) { @@ -306,46 +676,93 @@ class CronMigrationManager { this.isRunning = true; cronIsMigrating.set(true); - cronMigrationStatus.set('Starting all migrations...'); + cronMigrationStatus.set('Adding migrations to job queue...'); this.startTime = Date.now(); try { + // Add all migration steps to the job queue for (let i = 0; i < this.migrationSteps.length; i++) { const step = this.migrationSteps[i]; - this.currentStepIndex = i; - + if (step.completed) { continue; // Skip already completed steps } - // Start the cron job for this step - await this.startCronJob(step.cronName); - - // Wait for completion - await this.waitForCronJobCompletion(step); + // Add to job queue + const jobId = `migration_${step.id}_${Date.now()}`; + cronJobStorage.addToQueue(jobId, 'migration', step.weight, { + stepId: step.id, + stepName: step.name, + stepDescription: step.description + }); + + // Save initial job status + cronJobStorage.saveJobStatus(jobId, { + jobType: 'migration', + status: 'pending', + progress: 0, + stepId: step.id, + stepName: step.name, + stepDescription: step.description + }); } - // All migrations completed - cronMigrationStatus.set('All migrations completed successfully!'); - cronMigrationProgress.set(100); - cronMigrationCurrentStep.set(''); - - // Clear status after delay - setTimeout(() => { - cronIsMigrating.set(false); - cronMigrationStatus.set(''); - cronMigrationProgress.set(0); - }, 3000); + cronMigrationStatus.set('Migrations added to queue. Processing will begin shortly...'); + + // Start monitoring progress + this.monitorMigrationProgress(); } catch (error) { - console.error('Migration process failed:', error); - cronMigrationStatus.set(`Migration process failed: ${error.message}`); + console.error('Failed to start migrations:', error); + cronMigrationStatus.set(`Failed to start migrations: ${error.message}`); cronIsMigrating.set(false); - } finally { this.isRunning = false; } } + /** + * Monitor migration progress + */ + monitorMigrationProgress() { + const monitorInterval = Meteor.setInterval(() => { + const stats = cronJobStorage.getQueueStats(); + const incompleteJobs = cronJobStorage.getIncompleteJobs(); + + // Update progress + const totalJobs = stats.total; + const completedJobs = stats.completed; + const progress = totalJobs > 0 ? Math.round((completedJobs / totalJobs) * 100) : 0; + + cronMigrationProgress.set(progress); + + // Update status + if (stats.running > 0) { + const runningJob = incompleteJobs.find(job => job.status === 'running'); + if (runningJob) { + cronMigrationCurrentStep.set(runningJob.stepName || 'Processing migration...'); + cronMigrationStatus.set(`Running: ${runningJob.stepName || 'Migration in progress'}`); + } + } else if (stats.pending > 0) { + cronMigrationStatus.set(`${stats.pending} migrations pending in queue`); + cronMigrationCurrentStep.set('Waiting for available resources...'); + } else if (stats.completed === totalJobs && totalJobs > 0) { + // All migrations completed + cronMigrationStatus.set('All migrations completed successfully!'); + cronMigrationProgress.set(100); + cronMigrationCurrentStep.set(''); + + // Clear status after delay + setTimeout(() => { + cronIsMigrating.set(false); + cronMigrationStatus.set(''); + cronMigrationProgress.set(0); + }, 3000); + + Meteor.clearInterval(monitorInterval); + } + }, 2000); // Check every 2 seconds + } + /** * Start a specific cron job */ @@ -489,36 +906,42 @@ class CronMigrationManager { */ startBoardOperation(boardId, operationType, operationData) { const operationId = `${boardId}_${operationType}_${Date.now()}`; + + // Add to job queue + cronJobStorage.addToQueue(operationId, 'board_operation', 3, { + boardId, + operationType, + operationData + }); + + // Save initial job status + cronJobStorage.saveJobStatus(operationId, { + jobType: 'board_operation', + status: 'pending', + progress: 0, + boardId, + operationType, + operationData, + createdAt: new Date() + }); + + // Update board operations map for backward compatibility const operation = { id: operationId, boardId: boardId, type: operationType, data: operationData, - status: 'running', + status: 'pending', progress: 0, startTime: new Date(), endTime: null, error: null }; - // Update board operations map const operations = boardOperations.get(); operations.set(operationId, operation); boardOperations.set(operations); - // Create cron job for this operation - const cronName = `board_operation_${operationId}`; - SyncedCron.add({ - name: cronName, - schedule: (parser) => parser.text('once'), - job: () => { - this.executeBoardOperation(operationId, operationType, operationData); - }, - }); - - // Start the cron job - SyncedCron.start(); - return operationId; } @@ -978,5 +1401,90 @@ Meteor.methods({ } return cronMigrationManager.getBoardOperationStats(); + }, + + 'cron.getJobDetails'(jobId) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return cronJobStorage.getJobDetails(jobId); + }, + + 'cron.getQueueStats'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return cronJobStorage.getQueueStats(); + }, + + 'cron.getSystemResources'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return cronJobStorage.getSystemResources(); + }, + + 'cron.pauseJob'(jobId) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + cronJobStorage.updateQueueStatus(jobId, 'paused'); + cronJobStorage.saveJobStatus(jobId, { status: 'paused' }); + return { success: true }; + }, + + 'cron.resumeJob'(jobId) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + cronJobStorage.updateQueueStatus(jobId, 'pending'); + cronJobStorage.saveJobStatus(jobId, { status: 'pending' }); + return { success: true }; + }, + + 'cron.stopJob'(jobId) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + cronJobStorage.updateQueueStatus(jobId, 'stopped'); + cronJobStorage.saveJobStatus(jobId, { + status: 'stopped', + stoppedAt: new Date() + }); + return { success: true }; + }, + + 'cron.cleanupOldJobs'(daysOld) { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + return cronJobStorage.cleanupOldJobs(daysOld); + }, + + 'cron.getBoardMigrationStats'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + // Import the board migration detector + const { boardMigrationDetector } = require('./boardMigrationDetector'); + return boardMigrationDetector.getMigrationStats(); + }, + + 'cron.forceBoardMigrationScan'() { + if (!this.userId) { + throw new Meteor.Error('not-authorized'); + } + + // Import the board migration detector + const { boardMigrationDetector } = require('./boardMigrationDetector'); + return boardMigrationDetector.forceScan(); } });