🎨 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

This commit is contained in:
Liang Ding 2023-01-23 18:30:52 +08:00
parent a83a08bdb5
commit 72093f7e2e
No known key found for this signature in database
GPG key ID: 136F30F901A2231D
7 changed files with 103 additions and 70 deletions

View file

@ -40,24 +40,19 @@ import (
func RefreshBacklink(id string) { func RefreshBacklink(id string) {
WaitForWritingFiles() WaitForWritingFiles()
tx, err := sql.BeginTx()
if nil != err {
return
}
defer sql.CommitTx(tx)
refs := sql.QueryRefsByDefID(id, false) refs := sql.QueryRefsByDefID(id, false)
trees := map[string]*parse.Tree{} trees := map[string]*parse.Tree{}
for _, ref := range refs { for _, ref := range refs {
tree := trees[ref.RootID] tree := trees[ref.RootID]
if nil == tree { if nil == tree {
tree, err = loadTreeByBlockID(ref.RootID) var loadErr error
if nil != err { tree, loadErr = loadTreeByBlockID(ref.RootID)
logging.LogErrorf("refresh tree refs failed: %s", err) if nil != loadErr {
logging.LogErrorf("refresh tree refs failed: %s", loadErr)
continue continue
} }
trees[ref.RootID] = tree trees[ref.RootID] = tree
sql.UpsertRefs(tx, tree) sql.UpdateRefsTreeQueue(tree)
} }
} }
} }

View file

@ -482,6 +482,7 @@ func genTreeID(tree *parse.Tree) {
func FullReindex() { func FullReindex() {
task.PrependTask(task.DatabaseIndexFull, fullReindex) task.PrependTask(task.DatabaseIndexFull, fullReindex)
task.AppendTask(task.DatabaseIndexRef, IndexRefs)
} }
func fullReindex() { func fullReindex() {
@ -501,7 +502,6 @@ func fullReindex() {
for _, openedBox := range openedBoxes { for _, openedBox := range openedBoxes {
index(openedBox.ID) index(openedBox.ID)
} }
IndexRefs()
treenode.SaveBlockTree(true) treenode.SaveBlockTree(true)
LoadFlashcards() LoadFlashcards()

View file

@ -1459,12 +1459,6 @@ func createDoc(boxID, p, title, dom string) (err error) {
transaction := &Transaction{DoOperations: []*Operation{{Action: "create", Data: tree}}} transaction := &Transaction{DoOperations: []*Operation{{Action: "create", Data: tree}}}
err = PerformTransactions(&[]*Transaction{transaction}) err = PerformTransactions(&[]*Transaction{transaction})
if nil != err { if nil != err {
tx, txErr := sql.BeginTx()
if nil != txErr {
logging.LogFatalf("transaction failed: %s", txErr)
return
}
sql.CommitTx(tx)
logging.LogFatalf("transaction failed: %s", err) logging.LogFatalf("transaction failed: %s", err)
return return
} }

View file

@ -39,18 +39,14 @@ func (box *Box) Unindex() {
} }
func unindex(boxID string) { func unindex(boxID string) {
tx, err := sql.BeginTx()
if nil != err {
return
}
sql.DeleteByBoxTx(tx, boxID)
sql.CommitTx(tx)
ids := treenode.RemoveBlockTreesByBoxID(boxID) ids := treenode.RemoveBlockTreesByBoxID(boxID)
RemoveRecentDoc(ids) RemoveRecentDoc(ids)
sql.DeleteBoxQueue(boxID)
} }
func (box *Box) Index() { func (box *Box) Index() {
task.PrependTask(task.DatabaseIndex, index, box.ID) task.PrependTask(task.DatabaseIndex, index, box.ID)
task.AppendTask(task.DatabaseIndexRef, IndexRefs)
} }
func index(boxID string) { func index(boxID string) {
@ -94,12 +90,12 @@ func index(boxID string) {
} }
cache.PutDocIAL(file.path, docIAL) cache.PutDocIAL(file.path, docIAL)
treenode.IndexBlockTree(tree)
sql.UpsertTreeQueue(tree) sql.UpsertTreeQueue(tree)
util.IncBootProgress(bootProgressPart, fmt.Sprintf(Conf.Language(92), util.ShortPathForBootingDisplay(tree.Path))) util.IncBootProgress(bootProgressPart, fmt.Sprintf(Conf.Language(92), util.ShortPathForBootingDisplay(tree.Path)))
treeSize += file.size treeSize += file.size
treeCount++ treeCount++
treenode.IndexBlockTree(tree)
if 1 < i && 0 == i%64 { if 1 < i && 0 == i%64 {
util.PushEndlessProgress(fmt.Sprintf(Conf.Language(88), i, len(files)-i)) util.PushEndlessProgress(fmt.Sprintf(Conf.Language(88), i, len(files)-i))
} }
@ -128,16 +124,12 @@ func IndexRefs() {
for _, refBlock := range refBlocks { for _, refBlock := range refBlocks {
refTreeIDs.Add(refBlock.RootID) refTreeIDs.Add(refBlock.RootID)
} }
if 0 < refTreeIDs.Size() { if 0 < refTreeIDs.Size() {
luteEngine := NewLute() luteEngine := NewLute()
bootProgressPart := 10.0 / float64(refTreeIDs.Size()) bootProgressPart := 10.0 / float64(refTreeIDs.Size())
for _, box := range Conf.GetOpenedBoxes() { for _, box := range Conf.GetOpenedBoxes() {
tx, err := sql.BeginTx() sql.DeleteBoxRefsQueue(box.ID)
if nil != err {
return
}
sql.DeleteRefsByBoxTx(tx, box.ID)
sql.CommitTx(tx)
files := box.ListFiles("/") files := box.ListFiles("/")
i := 0 i := 0
@ -163,14 +155,7 @@ func IndexRefs() {
continue continue
} }
tx, err = sql.BeginTx() sql.InsertRefsTreeQueue(tree)
if nil != err {
continue
}
sql.InsertRefs(tx, tree)
if err = sql.CommitTx(tx); nil != err {
continue
}
if 1 < i && 0 == i%64 { if 1 < i && 0 == i%64 {
util.PushEndlessProgress(fmt.Sprintf(Conf.Language(55), i)) util.PushEndlessProgress(fmt.Sprintf(Conf.Language(55), i))
} }

View file

@ -196,7 +196,6 @@ func Mount(boxID string) (alreadyMount bool, err error) {
box.SaveConf(boxConf) box.SaveConf(boxConf)
box.Index() box.Index()
IndexRefs()
// 缓存根一级的文档树展开 // 缓存根一级的文档树展开
ListDocTree(box.ID, "/", Conf.FileTree.Sort) ListDocTree(box.ID, "/", Conf.FileTree.Sort)
treenode.SaveBlockTree(false) treenode.SaveBlockTree(false)

View file

@ -22,7 +22,6 @@ import (
"time" "time"
"github.com/88250/lute/parse" "github.com/88250/lute/parse"
"github.com/emirpasic/gods/sets/hashset"
"github.com/siyuan-note/eventbus" "github.com/siyuan-note/eventbus"
"github.com/siyuan-note/logging" "github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/task" "github.com/siyuan-note/siyuan/kernel/task"
@ -30,19 +29,20 @@ import (
) )
var ( var (
operationQueue []*treeQueueOperation operationQueue []*dbQueueOperation
upsertTreeQueueLock = sync.Mutex{} dbQueueLock = sync.Mutex{}
txLock = sync.Mutex{} txLock = sync.Mutex{}
) )
type treeQueueOperation struct { type dbQueueOperation struct {
inQueueTime time.Time inQueueTime time.Time
action string // upsert/delete/delete_id/rename action string // upsert/delete/delete_id/rename/delete_box/delete_box_refs/insert_refs
upsertTree *parse.Tree // upsert upsertTree *parse.Tree // upsert/insert_refs
removeTreeBox, removeTreePath string // delete removeTreeBox, removeTreePath string // delete
removeTreeIDBox, removeTreeID string // delete_id removeTreeIDBox, removeTreeID string // delete_id
box string // delete_box/delete_box_refs
renameTree *parse.Tree // rename renameTree *parse.Tree // rename
renameTreeOldHPath string // rename renameTreeOldHPath string // rename
} }
@ -83,8 +83,8 @@ func IsEmptyQueue() bool {
} }
func ClearQueue() { func ClearQueue() {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
operationQueue = nil operationQueue = nil
} }
@ -103,7 +103,6 @@ func FlushQueue() {
} }
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
boxes := hashset.New()
for _, op := range ops { for _, op := range ops {
switch op.action { switch op.action {
case "upsert": case "upsert":
@ -111,17 +110,21 @@ func FlushQueue() {
if err = upsertTree(tx, tree, context); nil != err { if err = upsertTree(tx, tree, context); nil != err {
logging.LogErrorf("upsert tree [%s] into database failed: %s", tree.Box+tree.Path, err) logging.LogErrorf("upsert tree [%s] into database failed: %s", tree.Box+tree.Path, err)
} }
boxes.Add(op.upsertTree.Box)
case "delete": case "delete":
batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
boxes.Add(op.removeTreeBox)
case "delete_id": case "delete_id":
DeleteByRootID(tx, op.removeTreeID) DeleteByRootID(tx, op.removeTreeID)
boxes.Add(op.removeTreeIDBox)
case "rename": case "rename":
batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath) batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID) updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
boxes.Add(op.renameTree.Box) case "delete_box":
DeleteByBoxTx(tx, op.box)
case "delete_box_refs":
DeleteRefsByBoxTx(tx, op.box)
case "insert_refs":
InsertRefs(tx, op.upsertTree)
case "update_refs":
UpsertRefs(tx, op.upsertTree)
default: default:
logging.LogErrorf("unknown operation [%s]", op.action) logging.LogErrorf("unknown operation [%s]", op.action)
} }
@ -133,20 +136,76 @@ func FlushQueue() {
} }
} }
func mergeUpsertTrees() (ops []*treeQueueOperation) { func mergeUpsertTrees() (ops []*dbQueueOperation) {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
ops = operationQueue ops = operationQueue
operationQueue = nil operationQueue = nil
return return
} }
func UpsertTreeQueue(tree *parse.Tree) { func UpdateRefsTreeQueue(tree *parse.Tree) {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
newOp := &treeQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"} newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "update_refs"}
for i, op := range operationQueue {
if "update_refs" == op.action && op.upsertTree.ID == tree.ID {
operationQueue[i] = newOp
return
}
}
operationQueue = append(operationQueue, newOp)
}
func InsertRefsTreeQueue(tree *parse.Tree) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "insert_refs"}
for i, op := range operationQueue {
if "insert_refs" == op.action && op.upsertTree.ID == tree.ID {
operationQueue[i] = newOp
return
}
}
operationQueue = append(operationQueue, newOp)
}
func DeleteBoxRefsQueue(boxID string) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box_refs"}
for i, op := range operationQueue {
if "delete_box_refs" == op.action && op.box == boxID {
operationQueue[i] = newOp
return
}
}
operationQueue = append(operationQueue, newOp)
}
func DeleteBoxQueue(boxID string) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box"}
for i, op := range operationQueue {
if "delete_box" == op.action && op.box == boxID {
operationQueue[i] = newOp
return
}
}
operationQueue = append(operationQueue, newOp)
}
func UpsertTreeQueue(tree *parse.Tree) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"}
for i, op := range operationQueue { for i, op := range operationQueue {
if "upsert" == op.action && op.upsertTree.ID == tree.ID { // 相同树则覆盖 if "upsert" == op.action && op.upsertTree.ID == tree.ID { // 相同树则覆盖
operationQueue[i] = newOp operationQueue[i] = newOp
@ -157,10 +216,10 @@ func UpsertTreeQueue(tree *parse.Tree) {
} }
func RenameTreeQueue(tree *parse.Tree, oldHPath string) { func RenameTreeQueue(tree *parse.Tree, oldHPath string) {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
newOp := &treeQueueOperation{ newOp := &dbQueueOperation{
renameTree: tree, renameTree: tree,
renameTreeOldHPath: oldHPath, renameTreeOldHPath: oldHPath,
inQueueTime: time.Now(), inQueueTime: time.Now(),
@ -175,10 +234,10 @@ func RenameTreeQueue(tree *parse.Tree, oldHPath string) {
} }
func RemoveTreeQueue(box, rootID string) { func RemoveTreeQueue(box, rootID string) {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
var tmp []*treeQueueOperation var tmp []*dbQueueOperation
// 将已有的 upsert 操作去重 // 将已有的 upsert 操作去重
for _, op := range operationQueue { for _, op := range operationQueue {
if "upsert" == op.action && op.upsertTree.ID != rootID { if "upsert" == op.action && op.upsertTree.ID != rootID {
@ -187,15 +246,15 @@ func RemoveTreeQueue(box, rootID string) {
} }
operationQueue = tmp operationQueue = tmp
newOp := &treeQueueOperation{removeTreeIDBox: box, removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"} newOp := &dbQueueOperation{removeTreeIDBox: box, removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"}
operationQueue = append(operationQueue, newOp) operationQueue = append(operationQueue, newOp)
} }
func RemoveTreePathQueue(treeBox, treePathPrefix string) { func RemoveTreePathQueue(treeBox, treePathPrefix string) {
upsertTreeQueueLock.Lock() dbQueueLock.Lock()
defer upsertTreeQueueLock.Unlock() defer dbQueueLock.Unlock()
var tmp []*treeQueueOperation var tmp []*dbQueueOperation
// 将已有的 upsert 操作去重 // 将已有的 upsert 操作去重
for _, op := range operationQueue { for _, op := range operationQueue {
if "upsert" == op.action && (op.removeTreeBox != treeBox || op.upsertTree.Path != treePathPrefix) { if "upsert" == op.action && (op.removeTreeBox != treeBox || op.upsertTree.Path != treePathPrefix) {
@ -204,6 +263,6 @@ func RemoveTreePathQueue(treeBox, treePathPrefix string) {
} }
operationQueue = tmp operationQueue = tmp
newOp := &treeQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"} newOp := &dbQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"}
operationQueue = append(operationQueue, newOp) operationQueue = append(operationQueue, newOp)
} }

View file

@ -102,6 +102,7 @@ const (
RepoCheckout = "task.repo.checkout" // 从快照中检出 RepoCheckout = "task.repo.checkout" // 从快照中检出
DatabaseIndexFull = "task.database.index.full" // 重建索引 DatabaseIndexFull = "task.database.index.full" // 重建索引
DatabaseIndex = "task.database.index" // 数据库索引队列 DatabaseIndex = "task.database.index" // 数据库索引队列
DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正 DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
OCRImage = "task.ocr.image" // 图片 OCR 提取文本 OCRImage = "task.ocr.image" // 图片 OCR 提取文本
HistoryGenerateDoc = "task.history.generateDoc" // 生成文件历史 HistoryGenerateDoc = "task.history.generateDoc" // 生成文件历史