This commit is contained in:
Daniel 2023-08-04 12:05:29 +08:00
parent 1aa4049750
commit 7d992ce175
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
21 changed files with 568 additions and 76 deletions

View file

@ -0,0 +1,96 @@
// SiYuan - Refactor your thinking
// Copyright (c) 2020-present, b3log.org
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package sql
import (
"database/sql"
"fmt"
"strings"
"github.com/siyuan-note/eventbus"
)
type AssetContent struct {
ID string
Name string
Ext string
Path string
Size int64
Updated int64
Content string
}
const (
AssetContentsFTSCaseInsensitiveInsert = "INSERT INTO asset_contents_fts_case_insensitive (id, name, ext, path, size, updated, content) VALUES %s"
AssetContentsPlaceholder = "(?, ?, ?, ?, ?, ?, ?)"
)
func insertAssetContents(tx *sql.Tx, assetContents []*AssetContent, context map[string]interface{}) (err error) {
if 1 > len(assetContents) {
return
}
var bulk []*AssetContent
for _, assetContent := range assetContents {
bulk = append(bulk, assetContent)
if 512 > len(bulk) {
continue
}
if err = insertAssetContents0(tx, bulk, context); nil != err {
return
}
bulk = []*AssetContent{}
}
if 0 < len(bulk) {
if err = insertAssetContents0(tx, bulk, context); nil != err {
return
}
}
return
}
func insertAssetContents0(tx *sql.Tx, bulk []*AssetContent, context map[string]interface{}) (err error) {
valueStrings := make([]string, 0, len(bulk))
valueArgs := make([]interface{}, 0, len(bulk)*strings.Count(AssetContentsPlaceholder, "?"))
for _, b := range bulk {
valueStrings = append(valueStrings, AssetContentsPlaceholder)
valueArgs = append(valueArgs, b.ID)
valueArgs = append(valueArgs, b.Name)
valueArgs = append(valueArgs, b.Ext)
valueArgs = append(valueArgs, b.Path)
valueArgs = append(valueArgs, b.Size)
valueArgs = append(valueArgs, b.Updated)
valueArgs = append(valueArgs, b.Content)
}
stmt := fmt.Sprintf(AssetContentsFTSCaseInsensitiveInsert, strings.Join(valueStrings, ","))
if err = prepareExecInsertTx(tx, stmt, valueArgs); nil != err {
return
}
eventbus.Publish(eventbus.EvtSQLInsertAssetContent, context)
return
}
func deleteAssetContentsByPath(tx *sql.Tx, path string, context map[string]interface{}) (err error) {
stmt := "DELETE FROM asset_contents_fts_case_insensitive WHERE path = ?"
if err = execStmtTx(tx, stmt, path); nil != err {
return
}
return
}

View file

@ -45,8 +45,9 @@ import (
)
var (
db *sql.DB
historyDB *sql.DB
db *sql.DB
historyDB *sql.DB
assetContentDB *sql.DB
)
func init() {
@ -193,7 +194,36 @@ func initDBTables() {
}
}
func initDBConnection() {
if nil != db {
closeDatabase()
}
dsn := util.DBPath + "?_journal_mode=WAL" +
"&_synchronous=OFF" +
"&_mmap_size=2684354560" +
"&_secure_delete=OFF" +
"&_cache_size=-20480" +
"&_page_size=32768" +
"&_busy_timeout=7000" +
"&_ignore_check_constraints=ON" +
"&_temp_store=MEMORY" +
"&_case_sensitive_like=OFF"
var err error
db, err = sql.Open("sqlite3_extended", dsn)
if nil != err {
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create database failed: %s", err)
}
db.SetMaxIdleConns(20)
db.SetMaxOpenConns(20)
db.SetConnMaxLifetime(365 * 24 * time.Hour)
}
var initHistoryDatabaseLock = sync.Mutex{}
func InitHistoryDatabase(forceRebuild bool) {
initHistoryDatabaseLock.Lock()
defer initHistoryDatabaseLock.Unlock()
initHistoryDBConnection()
if !forceRebuild && gulu.File.IsExist(util.HistoryDBPath) {
@ -228,7 +258,7 @@ func initHistoryDBConnection() {
var err error
historyDB, err = sql.Open("sqlite3_extended", dsn)
if nil != err {
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create database failed: %s", err)
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create history database failed: %s", err)
}
historyDB.SetMaxIdleConns(3)
historyDB.SetMaxOpenConns(3)
@ -243,11 +273,34 @@ func initHistoryDBTables() {
}
}
func initDBConnection() {
if nil != db {
closeDatabase()
var initAssetContentDatabaseLock = sync.Mutex{}
func InitAssetContentDatabase(forceRebuild bool) {
initAssetContentDatabaseLock.Lock()
defer initAssetContentDatabaseLock.Unlock()
initAssetContentDBConnection()
if !forceRebuild && gulu.File.IsExist(util.AssetContentDBPath) {
return
}
dsn := util.DBPath + "?_journal_mode=WAL" +
assetContentDB.Close()
if err := os.RemoveAll(util.AssetContentDBPath); nil != err {
logging.LogErrorf("remove assets database file [%s] failed: %s", util.AssetContentDBPath, err)
return
}
initAssetContentDBConnection()
initAssetContentDBTables()
}
func initAssetContentDBConnection() {
if nil != assetContentDB {
assetContentDB.Close()
}
dsn := util.AssetContentDBPath + "?_journal_mode=WAL" +
"&_synchronous=OFF" +
"&_mmap_size=2684354560" +
"&_secure_delete=OFF" +
@ -258,13 +311,21 @@ func initDBConnection() {
"&_temp_store=MEMORY" +
"&_case_sensitive_like=OFF"
var err error
db, err = sql.Open("sqlite3_extended", dsn)
assetContentDB, err = sql.Open("sqlite3_extended", dsn)
if nil != err {
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create database failed: %s", err)
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create assets database failed: %s", err)
}
assetContentDB.SetMaxIdleConns(3)
assetContentDB.SetMaxOpenConns(3)
assetContentDB.SetConnMaxLifetime(365 * 24 * time.Hour)
}
func initAssetContentDBTables() {
assetContentDB.Exec("DROP TABLE asset_contents_fts_case_insensitive")
_, err := assetContentDB.Exec("CREATE VIRTUAL TABLE asset_contents_fts_case_insensitive USING fts5(id UNINDEXED, name, ext, path, size UNINDEXED, updated UNINDEXED, content, tokenize=\"siyuan case_insensitive\")")
if nil != err {
logging.LogFatalf(logging.ExitCodeReadOnlyDatabase, "create table [asset_contents_fts_case_insensitive] failed: %s", err)
}
db.SetMaxIdleConns(20)
db.SetMaxOpenConns(20)
db.SetConnMaxLifetime(365 * 24 * time.Hour)
}
var (
@ -1161,6 +1222,18 @@ func beginTx() (tx *sql.Tx, err error) {
return
}
func commitTx(tx *sql.Tx) (err error) {
if nil == tx {
logging.LogErrorf("tx is nil")
return errors.New("tx is nil")
}
if err = tx.Commit(); nil != err {
logging.LogErrorf("commit tx failed: %s\n %s", err, logging.ShortStack())
}
return
}
func beginHistoryTx() (tx *sql.Tx, err error) {
if tx, err = historyDB.Begin(); nil != err {
logging.LogErrorf("begin history tx failed: %s\n %s", err, logging.ShortStack())
@ -1183,7 +1256,17 @@ func commitHistoryTx(tx *sql.Tx) (err error) {
return
}
func commitTx(tx *sql.Tx) (err error) {
func beginAssetContentTx() (tx *sql.Tx, err error) {
if tx, err = assetContentDB.Begin(); nil != err {
logging.LogErrorf("begin asset content tx failed: %s\n %s", err, logging.ShortStack())
if strings.Contains(err.Error(), "database is locked") {
os.Exit(logging.ExitCodeReadOnlyDatabase)
}
}
return
}
func commitAssetContentTx(tx *sql.Tx) (err error) {
if nil == tx {
logging.LogErrorf("tx is nil")
return errors.New("tx is nil")

View file

@ -0,0 +1,147 @@
// SiYuan - Refactor your thinking
// Copyright (c) 2020-present, b3log.org
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package sql
import (
"database/sql"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/siyuan-note/eventbus"
"github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/util"
)
var (
assetContentOperationQueue []*assetContentDBQueueOperation
assetContentDBQueueLock = sync.Mutex{}
assetContentTxLock = sync.Mutex{}
)
type assetContentDBQueueOperation struct {
inQueueTime time.Time
action string // index/deletePath
assetContents []*AssetContent // index
path string // deletePath
}
func FlushAssetContentTxJob() {
task.AppendTask(task.AssetContentDatabaseIndexCommit, FlushAssetContentQueue)
}
func FlushAssetContentQueue() {
ops := getAssetContentOperations()
if 1 > len(ops) {
return
}
assetContentTxLock.Lock()
defer assetContentTxLock.Unlock()
start := time.Now()
groupOpsTotal := map[string]int{}
for _, op := range ops {
groupOpsTotal[op.action]++
}
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
groupOpsCurrent := map[string]int{}
for i, op := range ops {
if util.IsExiting {
return
}
tx, err := beginAssetContentTx()
if nil != err {
return
}
groupOpsCurrent[op.action]++
context["current"] = groupOpsCurrent[op.action]
context["total"] = groupOpsTotal[op.action]
if err = execAssetContentOp(op, tx, context); nil != err {
tx.Rollback()
logging.LogErrorf("queue operation failed: %s", err)
eventbus.Publish(util.EvtSQLAssetContentRebuild)
return
}
if err = commitAssetContentTx(tx); nil != err {
logging.LogErrorf("commit tx failed: %s", err)
return
}
if 16 < i && 0 == i%128 {
debug.FreeOSMemory()
}
}
if 128 < len(ops) {
debug.FreeOSMemory()
}
elapsed := time.Now().Sub(start).Milliseconds()
if 7000 < elapsed {
logging.LogInfof("database asset content op tx [%dms]", elapsed)
}
}
func execAssetContentOp(op *assetContentDBQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) {
switch op.action {
case "index":
err = insertAssetContents(tx, op.assetContents, context)
case "delete":
err = deleteAssetContentsByPath(tx, op.path, context)
default:
msg := fmt.Sprintf("unknown asset content operation [%s]", op.action)
logging.LogErrorf(msg)
err = errors.New(msg)
}
return
}
func DeleteAssetContentsByPathQueue(path string) {
assetContentTxLock.Lock()
defer assetContentTxLock.Unlock()
newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "deletePath", path: path}
assetContentOperationQueue = append(assetContentOperationQueue, newOp)
}
func IndexAssetContentsQueue(assetContents []*AssetContent) {
assetContentTxLock.Lock()
defer assetContentTxLock.Unlock()
newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "index", assetContents: assetContents}
assetContentOperationQueue = append(assetContentOperationQueue, newOp)
}
func getAssetContentOperations() (ops []*assetContentDBQueueOperation) {
assetContentTxLock.Lock()
defer assetContentTxLock.Unlock()
ops = assetContentOperationQueue
assetContentOperationQueue = nil
return
}

View file

@ -55,8 +55,8 @@ func FlushHistoryQueue() {
return
}
txLock.Lock()
defer txLock.Unlock()
historyTxLock.Lock()
defer historyTxLock.Unlock()
start := time.Now()
groupOpsTotal := map[string]int{}
@ -145,27 +145,3 @@ func getHistoryOperations() (ops []*historyDBQueueOperation) {
historyOperationQueue = nil
return
}
func WaitForWritingHistoryDatabase() {
var printLog bool
var lastPrintLog bool
for i := 0; isWritingHistoryDatabase(); i++ {
time.Sleep(50 * time.Millisecond)
if 200 < i && !printLog { // 10s 后打日志
logging.LogWarnf("history database is writing: \n%s", logging.ShortStack())
printLog = true
}
if 1200 < i && !lastPrintLog { // 60s 后打日志
logging.LogWarnf("history database is still writing")
lastPrintLog = true
}
}
}
func isWritingHistoryDatabase() bool {
time.Sleep(util.SQLFlushInterval + 50*time.Millisecond)
if 0 < len(historyOperationQueue) || util.IsMutexLocked(&historyTxLock) {
return true
}
return false
}