Signed-off-by: Daniel <845765@qq.com>
This commit is contained in:
Daniel 2025-11-24 10:30:06 +08:00
parent bcef9bdff9
commit 8429a1df0b
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
3 changed files with 67 additions and 2 deletions

View file

@ -20,6 +20,7 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"path"
"runtime/debug"
"sync"
@ -30,6 +31,7 @@ import (
"github.com/siyuan-note/eventbus"
"github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/treenode"
"github.com/siyuan-note/siyuan/kernel/util"
)
@ -58,6 +60,32 @@ func FlushTxJob() {
task.AppendTask(task.DatabaseIndexCommit, FlushQueue)
}
func WaitFlushTx() {
var printLog bool
var lastPrintLog bool
for i := 0; isWritingDatabase(util.SQLFlushInterval + 50*time.Millisecond); i++ {
time.Sleep(50 * time.Millisecond)
if 200 < i && !printLog { // 10s 后打日志
logging.LogWarnf("database is writing: \n%s", logging.ShortStack())
printLog = true
}
if 1200 < i && !lastPrintLog { // 60s 后打日志
logging.LogWarnf("database is still writing")
lastPrintLog = true
}
}
}
func isWritingDatabase(d time.Duration) bool {
time.Sleep(d)
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
if 0 < len(operationQueue) || flushingTx.Load() {
return true
}
return false
}
func ClearQueue() {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
@ -82,6 +110,32 @@ func FlushQueue() {
start := time.Now()
// logging.LogInfof("flushing database queue, total operations [%d]", total)
// 如果有重命名子树的操作,则统计各路径前缀的块树数量,数量较大的话阻塞整个队列,以便尽可能合并重命名子树的操作
var renameSubTreeOp *dbQueueOperation
for _, op := range ops {
if "rename_sub_tree" == op.action {
renameSubTreeOp = op
break
}
}
if nil != renameSubTreeOp {
childCount := treenode.CountBlockTreesByPathPrefix(path.Dir(renameSubTreeOp.renameTree.Path))
if 512 < childCount {
scale := math.Log(float64(childCount)/512.0+1.0) / math.Log(2.0)
secs := 1.0 * scale
if secs < 1.0 {
secs = 1.0
}
if secs > 12.0 {
secs = 12.0
}
logging.LogInfof("rename sub tree [%s] with large child count [%d], sleep [%.2fs] to wait for more operations", renameSubTreeOp.renameTree.Path, childCount, secs)
time.Sleep(time.Duration(secs * float64(time.Second)))
}
}
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
if 512 < len(ops) {
disableCache()