Optimize WaitFlushTx() waiting performance. (#16880)

This commit is contained in:
Jane Haring 2026-02-07 10:36:54 +08:00 committed by GitHub
parent b8139ac21d
commit 81e1c40be0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -38,6 +38,7 @@ import (
var (
operationQueue []*dbQueueOperation
dbQueueLock = sync.Mutex{}
dbQueueCond = sync.NewCond(&dbQueueLock)
txLock = sync.Mutex{}
)
@ -61,10 +62,28 @@ func FlushTxJob() {
}
func WaitFlushTx() {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
var printLog bool
var lastPrintLog bool
for i := 0; isWritingDatabase(util.SQLFlushInterval + 50*time.Millisecond); i++ {
time.Sleep(50 * time.Millisecond)
var i int
for len(operationQueue) > 0 || flushingTx.Load() {
// 使用条件变量等待,避免轮询浪费 CPU
if i == 0 {
// 第一次等待时使用较短的超时,与原逻辑保持一致
dbQueueCond.Wait()
} else {
// 后续等待添加超时检测,用于打印警告日志
timer := time.AfterFunc(50*time.Millisecond, func() {
dbQueueCond.Broadcast()
})
dbQueueCond.Wait()
timer.Stop()
}
i++
if 200 < i && !printLog { // 10s 后打日志
logging.LogWarnf("database is writing: \n%s", logging.ShortStack())
printLog = true
@ -77,7 +96,11 @@ func WaitFlushTx() {
}
func isWritingDatabase(d time.Duration) bool {
time.Sleep(d)
// 等待指定时间后再检查状态
if d > 0 {
time.Sleep(d)
}
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
if 0 < len(operationQueue) || flushingTx.Load() {
@ -109,6 +132,8 @@ func FlushQueue() {
defer func() {
flushingTx.Store(false)
txLock.Unlock()
// 通知等待的协程队列已刷新完成
dbQueueCond.Broadcast()
}()
start := time.Now()