mirror of
https://github.com/siyuan-note/siyuan.git
synced 2025-12-20 08:30:12 +01:00
🎨 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113
This commit is contained in:
parent
b5fc879628
commit
430c8cbf6a
1 changed files with 43 additions and 34 deletions
|
|
@ -17,6 +17,9 @@
|
||||||
package sql
|
package sql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -103,57 +106,31 @@ func FlushQueue() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var execOps int
|
||||||
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
|
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
|
||||||
execOps := 0
|
|
||||||
for i, op := range ops {
|
for i, op := range ops {
|
||||||
if util.IsExiting {
|
if util.IsExiting {
|
||||||
break
|
return
|
||||||
}
|
|
||||||
|
|
||||||
switch op.action {
|
|
||||||
case "index":
|
|
||||||
err = indexTree(tx, op.box, op.indexPath, context)
|
|
||||||
case "upsert":
|
|
||||||
err = upsertTree(tx, op.upsertTree, context)
|
|
||||||
case "delete":
|
|
||||||
err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
|
|
||||||
case "delete_id":
|
|
||||||
err = deleteByRootID(tx, op.removeTreeID)
|
|
||||||
case "rename":
|
|
||||||
err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
|
|
||||||
if nil != err {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
|
|
||||||
case "delete_box":
|
|
||||||
err = deleteByBoxTx(tx, op.box)
|
|
||||||
case "delete_box_refs":
|
|
||||||
err = deleteRefsByBoxTx(tx, op.box)
|
|
||||||
case "insert_refs":
|
|
||||||
err = insertRefs(tx, op.upsertTree)
|
|
||||||
case "update_refs":
|
|
||||||
err = upsertRefs(tx, op.upsertTree)
|
|
||||||
default:
|
|
||||||
logging.LogErrorf("unknown operation [%s]", op.action)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = execOp(op, tx, context)
|
||||||
execOps++
|
execOps++
|
||||||
|
|
||||||
if nil != err {
|
if nil != err {
|
||||||
logging.LogErrorf("queue operation failed: %s", err)
|
logging.LogErrorf("queue operation failed: %s", err)
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if 0 < i && 0 == i%64 {
|
if 0 < i && 0 == execOps%64 {
|
||||||
if err = commitTx(tx); nil != err {
|
if err = commitTx(tx); nil != err {
|
||||||
logging.LogErrorf("commit tx failed: %s", err)
|
logging.LogErrorf("commit tx failed: %s", err)
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
execOps = 0
|
execOps = 0
|
||||||
tx, err = beginTx()
|
tx, err = beginTx()
|
||||||
if nil != err {
|
if nil != err {
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -169,6 +146,38 @@ func FlushQueue() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) {
|
||||||
|
switch op.action {
|
||||||
|
case "index":
|
||||||
|
err = indexTree(tx, op.box, op.indexPath, context)
|
||||||
|
case "upsert":
|
||||||
|
err = upsertTree(tx, op.upsertTree, context)
|
||||||
|
case "delete":
|
||||||
|
err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
|
||||||
|
case "delete_id":
|
||||||
|
err = deleteByRootID(tx, op.removeTreeID)
|
||||||
|
case "rename":
|
||||||
|
err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath)
|
||||||
|
if nil != err {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
|
||||||
|
case "delete_box":
|
||||||
|
err = deleteByBoxTx(tx, op.box)
|
||||||
|
case "delete_box_refs":
|
||||||
|
err = deleteRefsByBoxTx(tx, op.box)
|
||||||
|
case "insert_refs":
|
||||||
|
err = insertRefs(tx, op.upsertTree)
|
||||||
|
case "update_refs":
|
||||||
|
err = upsertRefs(tx, op.upsertTree)
|
||||||
|
default:
|
||||||
|
msg := fmt.Sprint("unknown operation [%s]", op.action)
|
||||||
|
logging.LogErrorf(msg)
|
||||||
|
err = errors.New(msg)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func mergeUpsertTrees() (ops []*dbQueueOperation) {
|
func mergeUpsertTrees() (ops []*dbQueueOperation) {
|
||||||
dbQueueLock.Lock()
|
dbQueueLock.Lock()
|
||||||
defer dbQueueLock.Unlock()
|
defer dbQueueLock.Unlock()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue