From 8dd0629b9c020ccf5cca7bdc90f31ff8394a4031 Mon Sep 17 00:00:00 2001 From: Daniel <845765@qq.com> Date: Tue, 22 Oct 2024 19:20:44 +0800 Subject: [PATCH] :recycle: Improve transaction flush --- kernel/api/block.go | 2 +- kernel/api/block_op.go | 20 ++++++++++---------- kernel/api/filetree.go | 10 +++++----- kernel/api/riff.go | 4 ++-- kernel/api/transaction.go | 2 +- kernel/model/backlink.go | 2 +- kernel/model/block.go | 2 +- kernel/model/blockial.go | 6 +++--- kernel/model/blockinfo.go | 4 ++-- kernel/model/bookmark.go | 2 +- kernel/model/box.go | 2 +- kernel/model/conf.go | 2 +- kernel/model/export.go | 4 ++-- kernel/model/file.go | 28 ++++++++++++++-------------- kernel/model/flashcard.go | 2 +- kernel/model/format.go | 2 +- kernel/model/history.go | 4 ++-- kernel/model/import.go | 2 +- kernel/model/mount.go | 8 ++++---- kernel/model/outline.go | 2 +- kernel/model/repository.go | 4 ++-- kernel/model/tag.go | 2 +- kernel/model/transaction.go | 35 +++++++++++------------------------ 23 files changed, 69 insertions(+), 82 deletions(-) diff --git a/kernel/api/block.go b/kernel/api/block.go index d2c34ed96..b77db1ed5 100644 --- a/kernel/api/block.go +++ b/kernel/api/block.go @@ -376,7 +376,7 @@ func getRefText(c *gin.Context) { } id := arg["id"].(string) - model.WaitForWritingFiles() + model.FlushTxQueue() refText := model.GetBlockRefText(id) if "" == refText { // 空块返回 id https://github.com/siyuan-note/siyuan/issues/10259 diff --git a/kernel/api/block_op.go b/kernel/api/block_op.go index e31d3d58e..155db9006 100644 --- a/kernel/api/block_op.go +++ b/kernel/api/block_op.go @@ -74,7 +74,7 @@ func moveOutlineHeading(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -127,7 +127,7 @@ func appendDailyNoteBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -180,7 +180,7 @@ func prependDailyNoteBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -241,7 +241,7 @@ func unfoldBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() broadcastTransactions(transactions) } @@ -301,7 +301,7 @@ func foldBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() broadcastTransactions(transactions) } @@ -355,7 +355,7 @@ func moveBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -400,7 +400,7 @@ func appendBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -445,7 +445,7 @@ func prependBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -508,7 +508,7 @@ func insertBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) @@ -599,7 +599,7 @@ func updateBlock(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() ret.Data = transactions broadcastTransactions(transactions) diff --git a/kernel/api/filetree.go b/kernel/api/filetree.go index d1a9c5762..548b1a740 100644 --- a/kernel/api/filetree.go +++ b/kernel/api/filetree.go @@ -223,7 +223,7 @@ func heading2Doc(c *gin.Context) { return } - model.WaitForWritingFiles() + model.FlushTxQueue() luteEngine := util.NewLute() tree, err := filesys.LoadTree(targetNotebook, targetPath, luteEngine) if err != nil { @@ -268,7 +268,7 @@ func li2Doc(c *gin.Context) { return } - model.WaitForWritingFiles() + model.FlushTxQueue() luteEngine := util.NewLute() tree, err := filesys.LoadTree(targetNotebook, targetPath, luteEngine) if err != nil { @@ -591,7 +591,7 @@ func createDoc(c *gin.Context) { return } - model.WaitForWritingFiles() + model.FlushTxQueue() box := model.Conf.Box(notebook) pushCreate(box, p, tree.Root.ID, arg) @@ -621,7 +621,7 @@ func createDailyNote(c *gin.Context) { return } - model.WaitForWritingFiles() + model.FlushTxQueue() box := model.Conf.Box(notebook) luteEngine := util.NewLute() tree, err := filesys.LoadTree(box.ID, p, luteEngine) @@ -720,7 +720,7 @@ func createDocWithMd(c *gin.Context) { } ret.Data = id - model.WaitForWritingFiles() + model.FlushTxQueue() box := model.Conf.Box(notebook) b, _ := model.GetBlock(id, nil) p := b.Path diff --git a/kernel/api/riff.go b/kernel/api/riff.go index 58287b594..60749b2bf 100644 --- a/kernel/api/riff.go +++ b/kernel/api/riff.go @@ -325,7 +325,7 @@ func removeRiffCards(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() if "" != deckID { deck := model.Decks[deckID] @@ -363,7 +363,7 @@ func addRiffCards(c *gin.Context) { } model.PerformTransactions(&transactions) - model.WaitForWritingFiles() + model.FlushTxQueue() deck := model.Decks[deckID] ret.Data = deckData(deck) diff --git a/kernel/api/transaction.go b/kernel/api/transaction.go index 667e9aada..b39cd32d1 100644 --- a/kernel/api/transaction.go +++ b/kernel/api/transaction.go @@ -85,7 +85,7 @@ func performTransactions(c *gin.Context) { func pushTransactions(app, session string, transactions []*model.Transaction) { pushMode := util.PushModeBroadcastExcludeSelf if 0 < len(transactions) && 0 < len(transactions[0].DoOperations) { - model.WaitForWritingFiles() // 等待文件写入完成,后续渲染才能读取到最新的数据 + model.FlushTxQueue() // 等待文件写入完成,后续渲染才能读取到最新的数据 action := transactions[0].DoOperations[0].Action isAttrViewTx := strings.Contains(strings.ToLower(action), "attrview") diff --git a/kernel/model/backlink.go b/kernel/model/backlink.go index 461294716..0aeea2509 100644 --- a/kernel/model/backlink.go +++ b/kernel/model/backlink.go @@ -38,7 +38,7 @@ import ( ) func RefreshBacklink(id string) { - WaitForWritingFiles() + FlushTxQueue() refreshRefsByDefID(id) } diff --git a/kernel/model/block.go b/kernel/model/block.go index d6009de12..2b9675ac3 100644 --- a/kernel/model/block.go +++ b/kernel/model/block.go @@ -463,7 +463,7 @@ func SwapBlockRef(refID, defID string, includeChildren bool) (err error) { return } } - WaitForWritingFiles() + FlushTxQueue() util.ReloadUI() return } diff --git a/kernel/model/blockial.go b/kernel/model/blockial.go index a712fe551..bd43d3718 100644 --- a/kernel/model/blockial.go +++ b/kernel/model/blockial.go @@ -51,7 +51,7 @@ func SetBlockReminder(id string, timed string) (err error) { timedMills = t.UnixMilli() } - WaitForWritingFiles() + FlushTxQueue() attrs := sql.GetBlockAttrs(id) tree, err := LoadTreeByBlockID(id) @@ -101,7 +101,7 @@ func BatchSetBlockAttrs(blockAttrs []map[string]interface{}) (err error) { return } - WaitForWritingFiles() + FlushTxQueue() var blockIDs []string for _, blockAttr := range blockAttrs { @@ -149,7 +149,7 @@ func SetBlockAttrs(id string, nameValues map[string]string) (err error) { return } - WaitForWritingFiles() + FlushTxQueue() tree, err := LoadTreeByBlockID(id) if err != nil { diff --git a/kernel/model/blockinfo.go b/kernel/model/blockinfo.go index 79d0ef897..6c2843a8e 100644 --- a/kernel/model/blockinfo.go +++ b/kernel/model/blockinfo.go @@ -53,7 +53,7 @@ type AttrView struct { } func GetDocInfo(blockID string) (ret *BlockInfo) { - WaitForWritingFiles() + FlushTxQueue() tree, err := LoadTreeByBlockID(blockID) if err != nil { @@ -125,7 +125,7 @@ func GetDocInfo(blockID string) (ret *BlockInfo) { } func GetDocsInfo(blockIDs []string, queryRefCount bool, queryAv bool) (rets []*BlockInfo) { - WaitForWritingFiles() + FlushTxQueue() trees := filesys.LoadTrees(blockIDs) for _, blockID := range blockIDs { diff --git a/kernel/model/bookmark.go b/kernel/model/bookmark.go index 85c119105..80ef3e136 100644 --- a/kernel/model/bookmark.go +++ b/kernel/model/bookmark.go @@ -158,7 +158,7 @@ func BookmarkLabels() (ret []string) { } func BuildBookmark() (ret *Bookmarks) { - WaitForWritingFiles() + FlushTxQueue() sql.FlushQueue() ret = &Bookmarks{} diff --git a/kernel/model/box.go b/kernel/model/box.go index 292604ce3..4f7ad2f3c 100644 --- a/kernel/model/box.go +++ b/kernel/model/box.go @@ -619,7 +619,7 @@ func fullReindex() { util.PushEndlessProgress(Conf.language(35)) defer util.PushClearProgress() - WaitForWritingFiles() + FlushTxQueue() if err := sql.InitDatabase(true); err != nil { os.Exit(logging.ExitCodeReadOnlyDatabase) diff --git a/kernel/model/conf.go b/kernel/model/conf.go index c592e23b8..36bad7c97 100644 --- a/kernel/model/conf.go +++ b/kernel/model/conf.go @@ -597,7 +597,7 @@ func Close(force, setCurrentWorkspace bool, execInstallPkg int) (exitCode int) { logging.LogInfof("exiting kernel [force=%v, setCurrentWorkspace=%v, execInstallPkg=%d]", force, setCurrentWorkspace, execInstallPkg) util.PushMsg(Conf.Language(95), 10000*60) - WaitForWritingFiles() + FlushTxQueue() if !force { if Conf.Sync.Enabled && 3 != Conf.Sync.Mode && diff --git a/kernel/model/export.go b/kernel/model/export.go index 18484a5ff..a73b0c6d0 100644 --- a/kernel/model/export.go +++ b/kernel/model/export.go @@ -478,7 +478,7 @@ func ExportData() (zipPath string, err error) { } func exportData(exportFolder string) (zipPath string, err error) { - WaitForWritingFiles() + FlushTxQueue() logging.LogInfof("exporting data...") @@ -521,7 +521,7 @@ func exportData(exportFolder string) (zipPath string, err error) { } func ExportResources(resourcePaths []string, mainName string) (exportFilePath string, err error) { - WaitForWritingFiles() + FlushTxQueue() // 用于导出的临时文件夹完整路径 exportFolderPath := filepath.Join(util.TempDir, "export", mainName) diff --git a/kernel/model/file.go b/kernel/model/file.go index ddeb9b7f9..d0be5ba06 100644 --- a/kernel/model/file.go +++ b/kernel/model/file.go @@ -504,7 +504,7 @@ func BlocksWordCount(ids []string) (ret *util.BlockStatResult) { } func StatTree(id string) (ret *util.BlockStatResult) { - WaitForWritingFiles() + FlushTxQueue() tree, _ := LoadTreeByBlockID(id) if nil == tree { @@ -614,7 +614,7 @@ func GetDoc(startID, endID, id string, index int, query string, queryTypes map[s //pprof.StartCPUProfile(cpuProfile) //defer pprof.StopCPUProfile() - WaitForWritingFiles() // 写入数据时阻塞,避免获取到的数据不一致 + FlushTxQueue() // 写入数据时阻塞,避免获取到的数据不一致 inputIndex := index tree, err := LoadTreeByBlockID(id) @@ -1121,7 +1121,7 @@ func DuplicateDoc(tree *parse.Tree) { resetTree(tree, "Duplicated", false) createTreeTx(tree) - WaitForWritingFiles() + FlushTxQueue() // 复制为副本时将该副本块插入到数据库中 https://github.com/siyuan-note/siyuan/issues/11959 ast.Walk(tree.Root, func(n *ast.Node, entering bool) ast.WalkStatus { @@ -1170,7 +1170,7 @@ func CreateDocByMd(boxID, p, title, md string, sorts []string) (tree *parse.Tree return } - WaitForWritingFiles() + FlushTxQueue() ChangeFileTreeSort(box.ID, sorts) return } @@ -1185,7 +1185,7 @@ func CreateWithMarkdown(tags, boxID, hPath, md, parentID, id string, withMath bo return } - WaitForWritingFiles() + FlushTxQueue() luteEngine := util.NewLute() if withMath { luteEngine.SetInlineMath(true) @@ -1206,7 +1206,7 @@ func CreateWithMarkdown(tags, boxID, hPath, md, parentID, id string, withMath bo nameValues["tags"] = tags SetBlockAttrs(retID, nameValues) - WaitForWritingFiles() + FlushTxQueue() return } @@ -1231,7 +1231,7 @@ func CreateDailyNote(boxID string) (p string, existed bool, err error) { return } - WaitForWritingFiles() + FlushTxQueue() existRoot := treenode.GetBlockTreeRootByHPath(box.ID, hPath) if nil != existRoot { @@ -1305,7 +1305,7 @@ func CreateDailyNote(boxID string) (p string, existed bool, err error) { } IncSync() - WaitForWritingFiles() + FlushTxQueue() tree, err := LoadTreeByBlockID(id) if err != nil { @@ -1451,7 +1451,7 @@ func MoveDocs(fromPaths []string, toBoxID, toPath string, callback interface{}) defer util.PushClearProgress() } - WaitForWritingFiles() + FlushTxQueue() luteEngine := util.NewLute() count := 0 for fromPath, fromBox := range pathsBoxes { @@ -1635,7 +1635,7 @@ func RemoveDoc(boxID, p string) { return } - WaitForWritingFiles() + FlushTxQueue() luteEngine := util.NewLute() removeDoc(box, p, luteEngine) IncSync() @@ -1648,7 +1648,7 @@ func RemoveDocs(paths []string) { paths = util.FilterSelfChildDocs(paths) pathsBoxes := getBoxesByPaths(paths) - WaitForWritingFiles() + FlushTxQueue() luteEngine := util.NewLute() for p, box := range pathsBoxes { removeDoc(box, p, luteEngine) @@ -1772,7 +1772,7 @@ func RenameDoc(boxID, p, title string) (err error) { return } - WaitForWritingFiles() + FlushTxQueue() luteEngine := util.NewLute() tree, err := filesys.LoadTree(box.ID, p, luteEngine) if err != nil { @@ -1928,7 +1928,7 @@ func createDoc(boxID, p, title, dom string) (tree *parse.Tree, err error) { transaction := &Transaction{DoOperations: []*Operation{{Action: "create", Data: tree}}} PerformTransactions(&[]*Transaction{transaction}) - WaitForWritingFiles() + FlushTxQueue() return } @@ -2000,7 +2000,7 @@ func ChangeFileTreeSort(boxID string, paths []string) { return } - WaitForWritingFiles() + FlushTxQueue() box := Conf.Box(boxID) sortIDs := map[string]int{} max := 0 diff --git a/kernel/model/flashcard.go b/kernel/model/flashcard.go index 54344c624..38f7b3c01 100644 --- a/kernel/model/flashcard.go +++ b/kernel/model/flashcard.go @@ -202,7 +202,7 @@ func resetFlashcards(deckID string, blockIDs []string) { } PerformTransactions(&transactions) - WaitForWritingFiles() + FlushTxQueue() } func GetFlashcardNotebooks() (ret []*Box) { diff --git a/kernel/model/format.go b/kernel/model/format.go index e08332e01..031d849cd 100644 --- a/kernel/model/format.go +++ b/kernel/model/format.go @@ -33,7 +33,7 @@ func AutoSpace(rootID string) (err error) { util.PushProtyleLoading(rootID, Conf.Language(116)) defer util.PushReloadProtyle(rootID) - WaitForWritingFiles() + FlushTxQueue() generateOpTypeHistory(tree, HistoryOpFormat) luteEngine := NewLute() diff --git a/kernel/model/history.go b/kernel/model/history.go index fcb576944..a91d1c220 100644 --- a/kernel/model/history.go +++ b/kernel/model/history.go @@ -63,7 +63,7 @@ func generateFileHistory() { return } - WaitForWritingFiles() + FlushTxQueue() // 生成文档历史 for _, box := range Conf.GetOpenedBoxes() { @@ -225,7 +225,7 @@ func RollbackDocHistory(boxID, historyPath string) (err error) { return } - WaitForWritingFiles() + FlushTxQueue() srcPath := historyPath var destPath, parentHPath string diff --git a/kernel/model/import.go b/kernel/model/import.go index f1a03534e..d892012d8 100644 --- a/kernel/model/import.go +++ b/kernel/model/import.go @@ -660,7 +660,7 @@ func ImportFromLocalPath(boxID, localPath string, toPath string) (err error) { lockSync() defer unlockSync() - WaitForWritingFiles() + FlushTxQueue() var baseHPath, baseTargetPath, boxLocalPath string if "/" == toPath { diff --git a/kernel/model/mount.go b/kernel/model/mount.go index cc55d4543..367c5d4dc 100644 --- a/kernel/model/mount.go +++ b/kernel/model/mount.go @@ -45,7 +45,7 @@ func CreateBox(name string) (id string, err error) { name = Conf.language(105) } - WaitForWritingFiles() + FlushTxQueue() createDocLock.Lock() defer createDocLock.Unlock() @@ -106,7 +106,7 @@ func RemoveBox(boxID string) (err error) { return errors.New(fmt.Sprintf("can not remove [%s] caused by it is a reserved file", boxID)) } - WaitForWritingFiles() + FlushTxQueue() isUserGuide := IsUserGuide(boxID) createDocLock.Lock() defer createDocLock.Unlock() @@ -147,7 +147,7 @@ func RemoveBox(boxID string) (err error) { } func Unmount(boxID string) { - WaitForWritingFiles() + FlushTxQueue() unmount0(boxID) evt := util.NewCmdResult("unmount", 0, util.PushModeBroadcast) @@ -178,7 +178,7 @@ func Mount(boxID string) (alreadyMount bool, err error) { boxLock.Store(boxID, true) defer boxLock.Delete(boxID) - WaitForWritingFiles() + FlushTxQueue() isUserGuide := IsUserGuide(boxID) localPath := filepath.Join(util.DataDir, boxID) diff --git a/kernel/model/outline.go b/kernel/model/outline.go index 96cca32ed..5e45b1f34 100644 --- a/kernel/model/outline.go +++ b/kernel/model/outline.go @@ -210,7 +210,7 @@ func (tx *Transaction) doMoveOutlineHeading(operation *Operation) (ret *TxErr) { func Outline(rootID string, preview bool) (ret []*Path, err error) { time.Sleep(util.FrontendQueueInterval) - WaitForWritingFiles() + FlushTxQueue() ret = []*Path{} tree, _ := LoadTreeByBlockID(rootID) diff --git a/kernel/model/repository.go b/kernel/model/repository.go index 5a926c311..a11249a77 100644 --- a/kernel/model/repository.go +++ b/kernel/model/repository.go @@ -628,7 +628,7 @@ func checkoutRepo(id string) { } util.PushEndlessProgress(Conf.Language(63)) - WaitForWritingFiles() + FlushTxQueue() CloseWatchAssets() defer WatchAssets() CloseWatchEmojis() @@ -962,7 +962,7 @@ func IndexRepo(memo string) (err error) { start := time.Now() latest, _ := repo.Latest() - WaitForWritingFiles() + FlushTxQueue() index, err := repo.Index(memo, map[string]interface{}{ eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBarAndProgress, }) diff --git a/kernel/model/tag.go b/kernel/model/tag.go index 43970d89b..856aff7dc 100644 --- a/kernel/model/tag.go +++ b/kernel/model/tag.go @@ -206,7 +206,7 @@ type Tag struct { type Tags []*Tag func BuildTags() (ret *Tags) { - WaitForWritingFiles() + FlushTxQueue() sql.FlushQueue() ret = &Tags{} diff --git a/kernel/model/transaction.go b/kernel/model/transaction.go index c9f29da80..08af36565 100644 --- a/kernel/model/transaction.go +++ b/kernel/model/transaction.go @@ -56,19 +56,9 @@ func IsMoveOutlineHeading(transactions *[]*Transaction) bool { return false } -func WaitForWritingFiles() { - var printLog bool - var lastPrintLog bool - for i := 0; isWritingFiles(); i++ { - time.Sleep(5 * time.Millisecond) - if 2000 < i && !printLog { // 10s 后打日志 - logging.LogWarnf("file is writing: \n%s", logging.ShortStack()) - printLog = true - } - if 12000 < i && !lastPrintLog { // 60s 后打日志 - logging.LogWarnf("file is still writing") - lastPrintLog = true - } +func FlushTxQueue() { + for 0 < len(txQueue) || isFlushing { + time.Sleep(10 * time.Millisecond) } } @@ -78,20 +68,17 @@ var ( isFlushing = false ) -func isWritingFiles() bool { - time.Sleep(time.Duration(50) * time.Millisecond) - return 0 < len(txQueue) || isFlushing +func init() { + go flushQueue() } -func init() { - go func() { - for { - select { - case tx := <-txQueue: - flushTx(tx) - } +func flushQueue() { + for { + select { + case tx := <-txQueue: + flushTx(tx) } - }() + } } func flushTx(tx *Transaction) {