diff --git a/kernel/model/transaction.go b/kernel/model/transaction.go index cd4c889d4..a0277cb69 100644 --- a/kernel/model/transaction.go +++ b/kernel/model/transaction.go @@ -1743,7 +1743,6 @@ func upsertAvBlockRel(node *ast.Node) { go func() { time.Sleep(100 * time.Millisecond) - sql.FlushQueue() affectedAvIDs = gulu.Str.RemoveDuplicatedElem(affectedAvIDs) var relatedAvIDs []string @@ -2090,7 +2089,7 @@ func updateRefTextRenameDoc(renamedTree *parse.Tree) { } func FlushUpdateRefTextRenameDocJob() { - sql.FlushQueue() + sql.WaitFlushTx() flushUpdateRefTextRenameDoc() } diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index 613306dbc..53e6aa17d 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -20,6 +20,7 @@ import ( "database/sql" "errors" "fmt" + "math" "path" "runtime/debug" "sync" @@ -30,6 +31,7 @@ import ( "github.com/siyuan-note/eventbus" "github.com/siyuan-note/logging" "github.com/siyuan-note/siyuan/kernel/task" + "github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/util" ) @@ -58,6 +60,32 @@ func FlushTxJob() { task.AppendTask(task.DatabaseIndexCommit, FlushQueue) } +func WaitFlushTx() { + var printLog bool + var lastPrintLog bool + for i := 0; isWritingDatabase(util.SQLFlushInterval + 50*time.Millisecond); i++ { + time.Sleep(50 * time.Millisecond) + if 200 < i && !printLog { // 10s 后打日志 + logging.LogWarnf("database is writing: \n%s", logging.ShortStack()) + printLog = true + } + if 1200 < i && !lastPrintLog { // 60s 后打日志 + logging.LogWarnf("database is still writing") + lastPrintLog = true + } + } +} + +func isWritingDatabase(d time.Duration) bool { + time.Sleep(d) + dbQueueLock.Lock() + defer dbQueueLock.Unlock() + if 0 < len(operationQueue) || flushingTx.Load() { + return true + } + return false +} + func ClearQueue() { dbQueueLock.Lock() defer dbQueueLock.Unlock() @@ -82,6 +110,32 @@ func FlushQueue() { start := time.Now() + // logging.LogInfof("flushing database queue, total operations [%d]", total) + + // 如果有重命名子树的操作,则统计各路径前缀的块树数量,数量较大的话阻塞整个队列,以便尽可能合并重命名子树的操作 + var renameSubTreeOp *dbQueueOperation + for _, op := range ops { + if "rename_sub_tree" == op.action { + renameSubTreeOp = op + break + } + } + if nil != renameSubTreeOp { + childCount := treenode.CountBlockTreesByPathPrefix(path.Dir(renameSubTreeOp.renameTree.Path)) + if 512 < childCount { + scale := math.Log(float64(childCount)/512.0+1.0) / math.Log(2.0) + secs := 1.0 * scale + if secs < 1.0 { + secs = 1.0 + } + if secs > 12.0 { + secs = 12.0 + } + logging.LogInfof("rename sub tree [%s] with large child count [%d], sleep [%.2fs] to wait for more operations", renameSubTreeOp.renameTree.Path, childCount, secs) + time.Sleep(time.Duration(secs * float64(time.Second))) + } + } + context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} if 512 < len(ops) { disableCache() diff --git a/kernel/treenode/blocktree.go b/kernel/treenode/blocktree.go index f3ecacfa5..e0c133854 100644 --- a/kernel/treenode/blocktree.go +++ b/kernel/treenode/blocktree.go @@ -410,6 +410,18 @@ func RemoveBlockTreesByRootID(rootID string) { } } +func CountBlockTreesByPathPrefix(pathPrefix string) (ret int) { + sqlStmt := "SELECT COUNT(*) FROM blocktrees WHERE path LIKE ?" + err := db.QueryRow(sqlStmt, pathPrefix+"%").Scan(&ret) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0 + } + logging.LogErrorf("sql query [%s] failed: %s", sqlStmt, err) + } + return +} + func GetBlockTreesByPathPrefix(pathPrefix string) (ret []*BlockTree) { sqlStmt := "SELECT * FROM blocktrees WHERE path LIKE ?" rows, err := db.Query(sqlStmt, pathPrefix+"%")