diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index 27e733baa..49339f41b 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -38,6 +38,7 @@ import ( var ( operationQueue []*dbQueueOperation dbQueueLock = sync.Mutex{} + dbQueueCond = sync.NewCond(&dbQueueLock) txLock = sync.Mutex{} ) @@ -61,10 +62,28 @@ func FlushTxJob() { } func WaitFlushTx() { + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + var printLog bool var lastPrintLog bool - for i := 0; isWritingDatabase(util.SQLFlushInterval + 50*time.Millisecond); i++ { - time.Sleep(50 * time.Millisecond) + var i int + + for len(operationQueue) > 0 || flushingTx.Load() { + // 使用条件变量等待,避免轮询浪费 CPU + if i == 0 { + // 第一次等待时使用较短的超时,与原逻辑保持一致 + dbQueueCond.Wait() + } else { + // 后续等待添加超时检测,用于打印警告日志 + timer := time.AfterFunc(50*time.Millisecond, func() { + dbQueueCond.Broadcast() + }) + dbQueueCond.Wait() + timer.Stop() + } + + i++ if 200 < i && !printLog { // 10s 后打日志 logging.LogWarnf("database is writing: \n%s", logging.ShortStack()) printLog = true @@ -77,7 +96,11 @@ func WaitFlushTx() { } func isWritingDatabase(d time.Duration) bool { - time.Sleep(d) + // 等待指定时间后再检查状态 + if d > 0 { + time.Sleep(d) + } + dbQueueLock.Lock() defer dbQueueLock.Unlock() if 0 < len(operationQueue) || flushingTx.Load() { @@ -109,6 +132,8 @@ func FlushQueue() { defer func() { flushingTx.Store(false) txLock.Unlock() + // 通知等待的协程队列已刷新完成 + dbQueueCond.Broadcast() }() start := time.Now()