diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index e3e361f10..e1f613eb3 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -17,6 +17,9 @@ package sql import ( + "database/sql" + "errors" + "fmt" "path" "sync" "time" @@ -103,57 +106,31 @@ func FlushQueue() { return } + var execOps int context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} - execOps := 0 for i, op := range ops { if util.IsExiting { - break - } - - switch op.action { - case "index": - err = indexTree(tx, op.box, op.indexPath, context) - case "upsert": - err = upsertTree(tx, op.upsertTree, context) - case "delete": - err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) - case "delete_id": - err = deleteByRootID(tx, op.removeTreeID) - case "rename": - err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath) - if nil != err { - break - } - err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID) - case "delete_box": - err = deleteByBoxTx(tx, op.box) - case "delete_box_refs": - err = deleteRefsByBoxTx(tx, op.box) - case "insert_refs": - err = insertRefs(tx, op.upsertTree) - case "update_refs": - err = upsertRefs(tx, op.upsertTree) - default: - logging.LogErrorf("unknown operation [%s]", op.action) - break + return } + err = execOp(op, tx, context) execOps++ + if nil != err { logging.LogErrorf("queue operation failed: %s", err) - break + return } - if 0 < i && 0 == i%64 { + if 0 < i && 0 == execOps%64 { if err = commitTx(tx); nil != err { logging.LogErrorf("commit tx failed: %s", err) - break + return } execOps = 0 tx, err = beginTx() if nil != err { - break + return } } } @@ -169,6 +146,38 @@ func FlushQueue() { } } +func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) { + switch op.action { + case "index": + err = indexTree(tx, op.box, op.indexPath, context) + case "upsert": + err = upsertTree(tx, op.upsertTree, context) + case "delete": + err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) + case "delete_id": + err = deleteByRootID(tx, op.removeTreeID) + case "rename": + err = batchUpdateHPath(tx, op.renameTree.Box, op.renameTree.ID, op.renameTreeOldHPath, op.renameTree.HPath) + if nil != err { + break + } + err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID) + case "delete_box": + err = deleteByBoxTx(tx, op.box) + case "delete_box_refs": + err = deleteRefsByBoxTx(tx, op.box) + case "insert_refs": + err = insertRefs(tx, op.upsertTree) + case "update_refs": + err = upsertRefs(tx, op.upsertTree) + default: + msg := fmt.Sprint("unknown operation [%s]", op.action) + logging.LogErrorf(msg) + err = errors.New(msg) + } + return +} + func mergeUpsertTrees() (ops []*dbQueueOperation) { dbQueueLock.Lock() defer dbQueueLock.Unlock()