2023-06-24 20:39:55 +08:00
// SiYuan - Refactor your thinking
2022-05-26 15:18:53 +08:00
// Copyright (c) 2020-present, b3log.org
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package sql
import (
2023-01-25 12:54:25 +08:00
"database/sql"
"errors"
"fmt"
2025-11-24 10:30:06 +08:00
"math"
2022-05-26 15:18:53 +08:00
"path"
2023-02-04 14:58:35 +08:00
"runtime/debug"
2022-05-26 15:18:53 +08:00
"sync"
2025-04-10 11:59:56 +08:00
"sync/atomic"
2022-05-26 15:18:53 +08:00
"time"
"github.com/88250/lute/parse"
2022-09-16 11:04:52 +08:00
"github.com/siyuan-note/eventbus"
2022-07-17 12:22:32 +08:00
"github.com/siyuan-note/logging"
2023-01-18 23:05:13 +08:00
"github.com/siyuan-note/siyuan/kernel/task"
2025-11-24 10:30:06 +08:00
"github.com/siyuan-note/siyuan/kernel/treenode"
2022-05-26 15:18:53 +08:00
"github.com/siyuan-note/siyuan/kernel/util"
)
var (
2023-01-23 18:30:52 +08:00
operationQueue [ ] * dbQueueOperation
dbQueueLock = sync . Mutex { }
2024-04-12 20:33:53 +08:00
txLock = sync . Mutex { }
2022-05-26 15:18:53 +08:00
)
2023-01-23 18:30:52 +08:00
type dbQueueOperation struct {
2023-04-01 16:18:19 +08:00
inQueueTime time . Time
2024-05-13 21:30:48 +08:00
action string // upsert/delete/delete_id/rename/rename_sub_tree/delete_box/delete_box_refs/index/delete_ids/update_block_content/delete_assets
2024-03-15 22:53:37 +08:00
indexTree * parse . Tree // index
2024-05-13 21:30:48 +08:00
upsertTree * parse . Tree // upsert/update_refs/delete_refs
2023-07-11 22:11:15 +08:00
removeTreeBox , removeTreePath string // delete
2024-03-15 22:53:37 +08:00
removeTreeID string // delete_id
2023-07-11 22:11:15 +08:00
removeTreeIDs [ ] string // delete_ids
box string // delete_box/delete_box_refs/index
renameTree * parse . Tree // rename/rename_sub_tree
block * Block // update_block_content
2023-12-07 20:40:16 +08:00
id string // index_node
2023-07-11 22:11:15 +08:00
removeAssetHashes [ ] string // delete_assets
2022-05-26 15:18:53 +08:00
}
2023-01-26 23:30:29 +08:00
func FlushTxJob ( ) {
task . AppendTask ( task . DatabaseIndexCommit , FlushQueue )
2022-05-26 15:18:53 +08:00
}
2025-11-24 10:30:06 +08:00
func WaitFlushTx ( ) {
var printLog bool
var lastPrintLog bool
for i := 0 ; isWritingDatabase ( util . SQLFlushInterval + 50 * time . Millisecond ) ; i ++ {
time . Sleep ( 50 * time . Millisecond )
if 200 < i && ! printLog { // 10s 后打日志
logging . LogWarnf ( "database is writing: \n%s" , logging . ShortStack ( ) )
printLog = true
}
if 1200 < i && ! lastPrintLog { // 60s 后打日志
logging . LogWarnf ( "database is still writing" )
lastPrintLog = true
}
}
}
func isWritingDatabase ( d time . Duration ) bool {
time . Sleep ( d )
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
if 0 < len ( operationQueue ) || flushingTx . Load ( ) {
return true
}
return false
}
2023-01-09 00:04:49 +08:00
func ClearQueue ( ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2023-01-09 00:04:49 +08:00
operationQueue = nil
}
2025-04-10 11:59:56 +08:00
var flushingTx = atomic . Bool { }
2023-01-18 23:05:13 +08:00
func FlushQueue ( ) {
2024-04-12 20:33:53 +08:00
ops := getOperations ( )
total := len ( ops )
2025-04-10 11:59:56 +08:00
if 1 > total && ! flushingTx . Load ( ) {
2022-05-26 15:18:53 +08:00
return
}
2024-04-12 20:33:53 +08:00
txLock . Lock ( )
2025-04-10 11:59:56 +08:00
flushingTx . Store ( true )
defer func ( ) {
flushingTx . Store ( false )
txLock . Unlock ( )
} ( )
2024-04-12 20:33:53 +08:00
2022-05-26 15:18:53 +08:00
start := time . Now ( )
2025-11-24 10:30:06 +08:00
// logging.LogInfof("flushing database queue, total operations [%d]", total)
// 如果有重命名子树的操作,则统计各路径前缀的块树数量,数量较大的话阻塞整个队列,以便尽可能合并重命名子树的操作
var renameSubTreeOp * dbQueueOperation
for _ , op := range ops {
if "rename_sub_tree" == op . action {
renameSubTreeOp = op
break
}
}
if nil != renameSubTreeOp {
childCount := treenode . CountBlockTreesByPathPrefix ( path . Dir ( renameSubTreeOp . renameTree . Path ) )
if 512 < childCount {
scale := math . Log ( float64 ( childCount ) / 512.0 + 1.0 ) / math . Log ( 2.0 )
secs := 1.0 * scale
if secs < 1.0 {
secs = 1.0
}
if secs > 12.0 {
secs = 12.0
}
logging . LogInfof ( "rename sub tree [%s] with large child count [%d], sleep [%.2fs] to wait for more operations" , renameSubTreeOp . renameTree . Path , childCount , secs )
time . Sleep ( time . Duration ( secs * float64 ( time . Second ) ) )
}
}
2022-09-16 11:04:52 +08:00
context := map [ string ] interface { } { eventbus . CtxPushMsg : eventbus . CtxPushMsgToStatusBar }
2024-04-12 20:33:53 +08:00
if 512 < len ( ops ) {
2023-02-09 17:18:29 +08:00
disableCache ( )
defer enableCache ( )
}
2023-02-18 14:37:54 +08:00
groupOpsTotal := map [ string ] int { }
2024-04-12 20:33:53 +08:00
for _ , op := range ops {
2023-02-18 14:37:54 +08:00
groupOpsTotal [ op . action ] ++
}
groupOpsCurrent := map [ string ] int { }
2024-04-12 20:33:53 +08:00
for i , op := range ops {
2023-12-08 13:05:50 +08:00
if util . IsExiting . Load ( ) {
2023-01-25 12:54:25 +08:00
return
2022-05-26 15:18:53 +08:00
}
2023-01-25 10:58:04 +08:00
2023-01-26 00:11:06 +08:00
tx , err := beginTx ( )
2024-09-04 04:40:50 +03:00
if err != nil {
2023-01-25 12:54:25 +08:00
return
2023-01-25 10:58:04 +08:00
}
2023-01-25 11:32:09 +08:00
2023-02-18 14:37:54 +08:00
groupOpsCurrent [ op . action ] ++
context [ "current" ] = groupOpsCurrent [ op . action ]
context [ "total" ] = groupOpsTotal [ op . action ]
2024-09-04 04:40:50 +03:00
if err = execOp ( op , tx , context ) ; err != nil {
2023-02-07 10:11:38 +08:00
tx . Rollback ( )
2023-03-17 16:29:50 +08:00
logging . LogErrorf ( "queue operation [%s] failed: %s" , op . action , err )
2023-02-07 10:40:24 +08:00
continue
2023-01-25 11:32:09 +08:00
}
2023-01-25 10:58:04 +08:00
2024-09-04 04:40:50 +03:00
if err = commitTx ( tx ) ; err != nil {
2023-01-25 11:32:09 +08:00
logging . LogErrorf ( "commit tx failed: %s" , err )
2023-12-08 13:05:50 +08:00
continue
2023-01-25 11:32:09 +08:00
}
2023-01-26 01:33:21 +08:00
2023-01-26 20:45:39 +08:00
if 16 < i && 0 == i % 128 {
2023-02-04 14:58:35 +08:00
debug . FreeOSMemory ( )
2023-01-26 01:33:21 +08:00
}
2022-05-26 15:18:53 +08:00
}
2023-01-26 00:11:06 +08:00
2023-12-08 13:05:50 +08:00
if 128 < total {
2023-02-04 14:58:35 +08:00
debug . FreeOSMemory ( )
2023-01-26 20:45:39 +08:00
}
2022-05-26 15:18:53 +08:00
elapsed := time . Now ( ) . Sub ( start ) . Milliseconds ( )
2023-01-27 15:51:12 +08:00
if 7000 < elapsed {
2023-01-27 15:51:51 +08:00
logging . LogInfof ( "database op tx [%dms]" , elapsed )
2022-05-26 15:18:53 +08:00
}
2023-07-27 22:38:06 +08:00
// Push database index commit event https://github.com/siyuan-note/siyuan/issues/8814
2023-07-27 22:48:14 +08:00
util . BroadcastByType ( "main" , "databaseIndexCommit" , 0 , "" , nil )
2024-06-27 21:29:31 +08:00
eventbus . Publish ( eventbus . EvtSQLIndexFlushed )
2022-05-26 15:18:53 +08:00
}
2023-01-26 00:47:14 +08:00
func execOp ( op * dbQueueOperation , tx * sql . Tx , context map [ string ] interface { } ) ( err error ) {
2023-01-25 12:54:25 +08:00
switch op . action {
case "index" :
2024-03-15 22:53:37 +08:00
err = indexTree ( tx , op . indexTree , context )
2023-01-25 12:54:25 +08:00
case "upsert" :
2023-01-26 00:47:14 +08:00
err = upsertTree ( tx , op . upsertTree , context )
2023-01-25 12:54:25 +08:00
case "delete" :
err = batchDeleteByPathPrefix ( tx , op . removeTreeBox , op . removeTreePath )
case "delete_id" :
2023-01-26 17:50:21 +08:00
err = deleteByRootID ( tx , op . removeTreeID , context )
2023-01-26 19:50:56 +08:00
case "delete_ids" :
err = batchDeleteByRootIDs ( tx , op . removeTreeIDs , context )
2023-01-25 12:54:25 +08:00
case "rename" :
2024-09-16 11:09:09 +08:00
err = batchUpdateHPath ( tx , op . renameTree , context )
2024-09-04 04:40:50 +03:00
if err != nil {
2023-01-25 12:54:25 +08:00
break
}
err = updateRootContent ( tx , path . Base ( op . renameTree . HPath ) , op . renameTree . Root . IALAttr ( "updated" ) , op . renameTree . ID )
2023-02-21 13:16:19 +08:00
case "rename_sub_tree" :
2024-09-16 11:09:09 +08:00
err = batchUpdatePath ( tx , op . renameTree , context )
2023-01-25 12:54:25 +08:00
case "delete_box" :
err = deleteByBoxTx ( tx , op . box )
case "delete_box_refs" :
err = deleteRefsByBoxTx ( tx , op . box )
case "update_refs" :
err = upsertRefs ( tx , op . upsertTree )
2023-03-29 19:38:03 +08:00
case "delete_refs" :
err = deleteRefs ( tx , op . upsertTree )
2023-01-31 17:28:47 +08:00
case "update_block_content" :
err = updateBlockContent ( tx , op . block )
2023-01-31 20:24:44 +08:00
case "delete_assets" :
err = deleteAssetsByHashes ( tx , op . removeAssetHashes )
2023-12-07 20:40:16 +08:00
case "index_node" :
err = indexNode ( tx , op . id )
2023-01-25 12:54:25 +08:00
default :
2023-01-26 00:11:06 +08:00
msg := fmt . Sprintf ( "unknown operation [%s]" , op . action )
2023-01-25 12:54:25 +08:00
logging . LogErrorf ( msg )
err = errors . New ( msg )
}
return
}
2023-12-07 20:40:16 +08:00
func IndexNodeQueue ( id string ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { id : id , inQueueTime : time . Now ( ) , action : "index_node" }
for i , op := range operationQueue {
if "index_node" == op . action && op . id == id {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-12-07 20:40:16 +08:00
}
2023-01-31 20:24:44 +08:00
func BatchRemoveAssetsQueue ( hashes [ ] string ) {
2023-02-01 15:22:09 +08:00
if 1 > len ( hashes ) {
return
}
2023-01-31 20:24:44 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { removeAssetHashes : hashes , inQueueTime : time . Now ( ) , action : "delete_assets" }
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-31 20:24:44 +08:00
}
2023-01-31 17:28:47 +08:00
func UpdateBlockContentQueue ( block * Block ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2022-05-26 15:18:53 +08:00
2023-01-31 17:28:47 +08:00
newOp := & dbQueueOperation { block : block , inQueueTime : time . Now ( ) , action : "update_block_content" }
for i , op := range operationQueue {
if "update_block_content" == op . action && op . block . ID == block . ID {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2022-05-26 15:18:53 +08:00
}
2023-03-29 19:38:03 +08:00
func DeleteRefsTreeQueue ( tree * parse . Tree ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { upsertTree : tree , inQueueTime : time . Now ( ) , action : "delete_refs" }
for i , op := range operationQueue {
if "delete_refs" == op . action && op . upsertTree . ID == tree . ID {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-03-29 19:38:03 +08:00
}
2023-01-23 18:30:52 +08:00
func UpdateRefsTreeQueue ( tree * parse . Tree ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { upsertTree : tree , inQueueTime : time . Now ( ) , action : "update_refs" }
for i , op := range operationQueue {
if "update_refs" == op . action && op . upsertTree . ID == tree . ID {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-23 18:30:52 +08:00
}
func DeleteBoxRefsQueue ( boxID string ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { box : boxID , inQueueTime : time . Now ( ) , action : "delete_box_refs" }
for i , op := range operationQueue {
if "delete_box_refs" == op . action && op . box == boxID {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-23 18:30:52 +08:00
}
func DeleteBoxQueue ( boxID string ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { box : boxID , inQueueTime : time . Now ( ) , action : "delete_box" }
for i , op := range operationQueue {
if "delete_box" == op . action && op . box == boxID {
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-23 18:30:52 +08:00
}
2024-03-15 22:53:37 +08:00
func IndexTreeQueue ( tree * parse . Tree ) {
2023-01-25 12:33:38 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2024-03-15 22:53:37 +08:00
newOp := & dbQueueOperation { indexTree : tree , inQueueTime : time . Now ( ) , action : "index" }
2023-01-25 12:33:38 +08:00
for i , op := range operationQueue {
2024-03-15 22:53:37 +08:00
if "index" == op . action && op . indexTree . ID == tree . ID { // 相同树则覆盖
2023-01-25 12:33:38 +08:00
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-25 12:33:38 +08:00
}
2022-05-26 15:18:53 +08:00
func UpsertTreeQueue ( tree * parse . Tree ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2022-05-26 15:18:53 +08:00
2023-01-23 18:30:52 +08:00
newOp := & dbQueueOperation { upsertTree : tree , inQueueTime : time . Now ( ) , action : "upsert" }
2022-05-26 15:18:53 +08:00
for i , op := range operationQueue {
if "upsert" == op . action && op . upsertTree . ID == tree . ID { // 相同树则覆盖
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2022-05-26 15:18:53 +08:00
}
2023-02-21 13:00:27 +08:00
func RenameTreeQueue ( tree * parse . Tree ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2022-05-26 15:18:53 +08:00
2023-01-23 18:30:52 +08:00
newOp := & dbQueueOperation {
2023-02-21 13:00:27 +08:00
renameTree : tree ,
inQueueTime : time . Now ( ) ,
action : "rename" ,
}
2022-05-26 15:18:53 +08:00
for i , op := range operationQueue {
2022-09-02 16:26:05 +08:00
if "rename" == op . action && op . renameTree . ID == tree . ID { // 相同树则覆盖
2022-05-26 15:18:53 +08:00
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2022-05-26 15:18:53 +08:00
}
2023-02-21 13:16:19 +08:00
func RenameSubTreeQueue ( tree * parse . Tree ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation {
renameTree : tree ,
inQueueTime : time . Now ( ) ,
action : "rename_sub_tree" ,
}
for i , op := range operationQueue {
if "rename_sub_tree" == op . action && op . renameTree . ID == tree . ID { // 相同树则覆盖
operationQueue [ i ] = newOp
return
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-02-21 13:16:19 +08:00
}
2024-03-15 22:53:37 +08:00
func RemoveTreeQueue ( rootID string ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2022-05-26 15:18:53 +08:00
2024-03-15 22:53:37 +08:00
newOp := & dbQueueOperation { removeTreeID : rootID , inQueueTime : time . Now ( ) , action : "delete_id" }
2023-01-26 17:50:21 +08:00
for i , op := range operationQueue {
2024-03-15 22:53:37 +08:00
if "delete_id" == op . action && op . removeTreeID == rootID {
2023-01-26 17:50:21 +08:00
operationQueue [ i ] = newOp
return
2022-05-26 15:18:53 +08:00
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2022-05-26 15:18:53 +08:00
}
2023-01-26 19:50:56 +08:00
func BatchRemoveTreeQueue ( rootIDs [ ] string ) {
2023-02-01 15:22:09 +08:00
if 1 > len ( rootIDs ) {
return
}
2023-01-26 19:50:56 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
newOp := & dbQueueOperation { removeTreeIDs : rootIDs , inQueueTime : time . Now ( ) , action : "delete_ids" }
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2023-01-26 19:50:56 +08:00
}
2022-05-26 15:18:53 +08:00
func RemoveTreePathQueue ( treeBox , treePathPrefix string ) {
2023-01-23 18:30:52 +08:00
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
2022-05-26 15:18:53 +08:00
2023-01-26 17:50:21 +08:00
newOp := & dbQueueOperation { removeTreeBox : treeBox , removeTreePath : treePathPrefix , inQueueTime : time . Now ( ) , action : "delete" }
for i , op := range operationQueue {
if "delete" == op . action && ( op . removeTreeBox == treeBox && op . removeTreePath == treePathPrefix ) {
operationQueue [ i ] = newOp
return
2022-05-26 15:18:53 +08:00
}
}
2024-06-27 21:29:31 +08:00
appendOperation ( newOp )
2022-05-26 15:18:53 +08:00
}
2024-04-12 20:33:53 +08:00
func getOperations ( ) ( ops [ ] * dbQueueOperation ) {
dbQueueLock . Lock ( )
defer dbQueueLock . Unlock ( )
ops = operationQueue
operationQueue = nil
return
}
2024-06-27 21:29:31 +08:00
func appendOperation ( op * dbQueueOperation ) {
operationQueue = append ( operationQueue , op )
eventbus . Publish ( eventbus . EvtSQLIndexChanged )
}