mirror of
https://github.com/siyuan-note/siyuan.git
synced 2025-12-17 23:20:13 +01:00
🎨 历史数据库索引队列化 https://github.com/siyuan-note/siyuan/issues/7386
This commit is contained in:
parent
4d8b826e29
commit
f581b34ea8
14 changed files with 196 additions and 56 deletions
|
|
@ -1142,7 +1142,7 @@ func beginTx() (tx *sql.Tx, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func BeginHistoryTx() (tx *sql.Tx, err error) {
|
||||
func beginHistoryTx() (tx *sql.Tx, err error) {
|
||||
if tx, err = historyDB.Begin(); nil != err {
|
||||
logging.LogErrorf("begin history tx failed: %s\n %s", err, logging.ShortStack())
|
||||
if strings.Contains(err.Error(), "database is locked") {
|
||||
|
|
@ -1152,7 +1152,7 @@ func BeginHistoryTx() (tx *sql.Tx, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func CommitHistoryTx(tx *sql.Tx) (err error) {
|
||||
func commitHistoryTx(tx *sql.Tx) (err error) {
|
||||
if nil == tx {
|
||||
logging.LogErrorf("tx is nil")
|
||||
return errors.New("tx is nil")
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/siyuan-note/eventbus"
|
||||
"github.com/siyuan-note/logging"
|
||||
)
|
||||
|
||||
|
|
@ -102,7 +103,7 @@ func queryHistory(query string, args ...interface{}) (*sql.Rows, error) {
|
|||
return historyDB.Query(query, args...)
|
||||
}
|
||||
|
||||
func DeleteHistoriesByPathPrefix(tx *sql.Tx, pathPrefix string) (err error) {
|
||||
func deleteHistoriesByPathPrefix(tx *sql.Tx, pathPrefix string, context map[string]interface{}) (err error) {
|
||||
stmt := "DELETE FROM histories_fts_case_insensitive WHERE path LIKE ?"
|
||||
if err = execStmtTx(tx, stmt, pathPrefix+"%"); nil != err {
|
||||
return
|
||||
|
|
@ -115,7 +116,7 @@ const (
|
|||
HistoriesPlaceholder = "(?, ?, ?, ?, ?, ?)"
|
||||
)
|
||||
|
||||
func InsertHistories(tx *sql.Tx, histories []*History) (err error) {
|
||||
func insertHistories(tx *sql.Tx, histories []*History, context map[string]interface{}) (err error) {
|
||||
if 1 > len(histories) {
|
||||
return
|
||||
}
|
||||
|
|
@ -127,20 +128,20 @@ func InsertHistories(tx *sql.Tx, histories []*History) (err error) {
|
|||
continue
|
||||
}
|
||||
|
||||
if err = insertHistories0(tx, bulk); nil != err {
|
||||
if err = insertHistories0(tx, bulk, context); nil != err {
|
||||
return
|
||||
}
|
||||
bulk = []*History{}
|
||||
}
|
||||
if 0 < len(bulk) {
|
||||
if err = insertHistories0(tx, bulk); nil != err {
|
||||
if err = insertHistories0(tx, bulk, context); nil != err {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func insertHistories0(tx *sql.Tx, bulk []*History) (err error) {
|
||||
func insertHistories0(tx *sql.Tx, bulk []*History, context map[string]interface{}) (err error) {
|
||||
valueStrings := make([]string, 0, len(bulk))
|
||||
valueArgs := make([]interface{}, 0, len(bulk)*strings.Count(HistoriesPlaceholder, "?"))
|
||||
for _, b := range bulk {
|
||||
|
|
@ -157,5 +158,7 @@ func insertHistories0(tx *sql.Tx, bulk []*History) (err error) {
|
|||
if err = prepareExecInsertTx(tx, stmt, valueArgs); nil != err {
|
||||
return
|
||||
}
|
||||
|
||||
eventbus.Publish(eventbus.EvtSQLInsertHistory, context)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
139
kernel/sql/queue_history.go
Normal file
139
kernel/sql/queue_history.go
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
// SiYuan - Build Your Eternal Digital Garden
|
||||
// Copyright (c) 2020-present, b3log.org
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package sql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/siyuan-note/eventbus"
|
||||
"github.com/siyuan-note/logging"
|
||||
"github.com/siyuan-note/siyuan/kernel/task"
|
||||
"github.com/siyuan-note/siyuan/kernel/util"
|
||||
)
|
||||
|
||||
var (
|
||||
historyOperationQueue []*historyDBQueueOperation
|
||||
historyDBQueueLock = sync.Mutex{}
|
||||
|
||||
historyTxLock = sync.Mutex{}
|
||||
)
|
||||
|
||||
type historyDBQueueOperation struct {
|
||||
inQueueTime time.Time
|
||||
action string // index/deletePathPrefix
|
||||
|
||||
histories []*History // index
|
||||
pathPrefix string // deletePathPrefix
|
||||
}
|
||||
|
||||
func FlushHistoryTxJob() {
|
||||
task.AppendTask(task.HistoryDatabaseIndexCommit, FlushHistoryQueue)
|
||||
}
|
||||
|
||||
func FlushHistoryQueue() {
|
||||
ops := getHistoryOperations()
|
||||
if 1 > len(ops) {
|
||||
return
|
||||
}
|
||||
|
||||
txLock.Lock()
|
||||
defer txLock.Unlock()
|
||||
start := time.Now()
|
||||
|
||||
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
|
||||
total := len(ops)
|
||||
for i, op := range ops {
|
||||
if util.IsExiting {
|
||||
return
|
||||
}
|
||||
|
||||
tx, err := beginHistoryTx()
|
||||
if nil != err {
|
||||
return
|
||||
}
|
||||
|
||||
context["current"] = i
|
||||
context["total"] = total
|
||||
if err = execHistoryOp(op, tx, context); nil != err {
|
||||
tx.Rollback()
|
||||
logging.LogErrorf("queue operation failed: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = commitHistoryTx(tx); nil != err {
|
||||
logging.LogErrorf("commit tx failed: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if 16 < i && 0 == i%128 {
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
|
||||
if 128 < len(ops) {
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
|
||||
elapsed := time.Now().Sub(start).Milliseconds()
|
||||
if 7000 < elapsed {
|
||||
logging.LogInfof("database history op tx [%dms]", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
func execHistoryOp(op *historyDBQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) {
|
||||
switch op.action {
|
||||
case "index":
|
||||
err = insertHistories(tx, op.histories, context)
|
||||
case "deletePathPrefix":
|
||||
err = deleteHistoriesByPathPrefix(tx, op.pathPrefix, context)
|
||||
default:
|
||||
msg := fmt.Sprintf("unknown history operation [%s]", op.action)
|
||||
logging.LogErrorf(msg)
|
||||
err = errors.New(msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func DeleteHistoriesByPathPrefixQueue(pathPrefix string) {
|
||||
historyDBQueueLock.Lock()
|
||||
defer historyDBQueueLock.Unlock()
|
||||
|
||||
newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "deletePathPrefix", pathPrefix: pathPrefix}
|
||||
historyOperationQueue = append(historyOperationQueue, newOp)
|
||||
}
|
||||
|
||||
func IndexHistoriesQueue(histories []*History) {
|
||||
historyDBQueueLock.Lock()
|
||||
defer historyDBQueueLock.Unlock()
|
||||
|
||||
newOp := &historyDBQueueOperation{inQueueTime: time.Now(), action: "index", histories: histories}
|
||||
historyOperationQueue = append(historyOperationQueue, newOp)
|
||||
}
|
||||
|
||||
func getHistoryOperations() (ops []*historyDBQueueOperation) {
|
||||
historyDBQueueLock.Lock()
|
||||
defer historyDBQueueLock.Unlock()
|
||||
|
||||
ops = historyOperationQueue
|
||||
historyOperationQueue = nil
|
||||
return
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue