🎨 改进内核任务调度机制提升稳定性 https://github.com/siyuan-note/siyuan/issues/7113

This commit is contained in:
Liang Ding 2023-01-18 23:00:30 +08:00
parent 98f682e507
commit 492389470f
No known key found for this signature in database
GPG key ID: 136F30F901A2231D
6 changed files with 181 additions and 38 deletions

View file

@ -113,11 +113,7 @@ func checkoutRepo(c *gin.Context) {
} }
id := arg["id"].(string) id := arg["id"].(string)
if err := model.CheckoutRepo(id); nil != err { model.CheckoutRepo(id)
ret.Code = -1
ret.Msg = model.Conf.Language(141)
return
}
} }
func downloadCloudSnapshot(c *gin.Context) { func downloadCloudSnapshot(c *gin.Context) {

View file

@ -23,6 +23,7 @@ import (
"github.com/siyuan-note/siyuan/kernel/model" "github.com/siyuan-note/siyuan/kernel/model"
"github.com/siyuan-note/siyuan/kernel/server" "github.com/siyuan-note/siyuan/kernel/server"
"github.com/siyuan-note/siyuan/kernel/sql" "github.com/siyuan-note/siyuan/kernel/sql"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/treenode"
"github.com/siyuan-note/siyuan/kernel/util" "github.com/siyuan-note/siyuan/kernel/util"
) )
@ -42,6 +43,8 @@ func main() {
model.LoadFlashcards() model.LoadFlashcards()
model.LoadAssetsTexts() model.LoadAssetsTexts()
go task.Loop()
go model.AutoGenerateDocHistory() go model.AutoGenerateDocHistory()
go model.AutoSync() go model.AutoSync()
go model.AutoStat() go model.AutoStat()
@ -49,7 +52,7 @@ func main() {
util.PushClearAllMsg() util.PushClearAllMsg()
go model.AutoRefreshCheck() go model.AutoRefreshCheck()
go model.AutoFlushTx() go model.AutoFlushTx()
go sql.AutoFlushTreeQueue() go sql.AutoFlushQueue()
go treenode.AutoFlushBlockTree() go treenode.AutoFlushBlockTree()
go cache.LoadAssets() go cache.LoadAssets()
go model.AutoFixIndex() go model.AutoFixIndex()

View file

@ -27,6 +27,7 @@ import (
"github.com/siyuan-note/siyuan/kernel/model" "github.com/siyuan-note/siyuan/kernel/model"
"github.com/siyuan-note/siyuan/kernel/server" "github.com/siyuan-note/siyuan/kernel/server"
"github.com/siyuan-note/siyuan/kernel/sql" "github.com/siyuan-note/siyuan/kernel/sql"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/treenode"
"github.com/siyuan-note/siyuan/kernel/util" "github.com/siyuan-note/siyuan/kernel/util"
_ "golang.org/x/mobile/bind" _ "golang.org/x/mobile/bind"
@ -56,6 +57,8 @@ func StartKernel(container, appDir, workspaceBaseDir, timezoneID, localIPs, lang
model.LoadFlashcards() model.LoadFlashcards()
model.LoadAssetsTexts() model.LoadAssetsTexts()
go task.Loop()
go model.AutoGenerateDocHistory() go model.AutoGenerateDocHistory()
go model.AutoSync() go model.AutoSync()
go model.AutoStat() go model.AutoStat()
@ -63,7 +66,7 @@ func StartKernel(container, appDir, workspaceBaseDir, timezoneID, localIPs, lang
util.PushClearAllMsg() util.PushClearAllMsg()
go model.AutoRefreshCheck() go model.AutoRefreshCheck()
go model.AutoFlushTx() go model.AutoFlushTx()
go sql.AutoFlushTreeQueue() go sql.AutoFlushQueue()
go treenode.AutoFlushBlockTree() go treenode.AutoFlushBlockTree()
go cache.LoadAssets() go cache.LoadAssets()
go model.AutoFixIndex() go model.AutoFixIndex()

View file

@ -39,6 +39,7 @@ import (
"github.com/siyuan-note/logging" "github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/conf" "github.com/siyuan-note/siyuan/kernel/conf"
"github.com/siyuan-note/siyuan/kernel/sql" "github.com/siyuan-note/siyuan/kernel/sql"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/treenode"
"github.com/siyuan-note/siyuan/kernel/util" "github.com/siyuan-note/siyuan/kernel/util"
) )
@ -504,10 +505,11 @@ func genTreeID(tree *parse.Tree) {
return return
} }
var isFullReindexing = false
func FullReindex() { func FullReindex() {
isFullReindexing = true task.PrependTask(task.DatabaseIndexFull, fullReindex)
}
func fullReindex() {
util.PushEndlessProgress(Conf.Language(35)) util.PushEndlessProgress(Conf.Language(35))
WaitForWritingFiles() WaitForWritingFiles()
@ -526,7 +528,6 @@ func FullReindex() {
LoadFlashcards() LoadFlashcards()
util.PushEndlessProgress(Conf.Language(58)) util.PushEndlessProgress(Conf.Language(58))
isFullReindexing = false
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
util.ReloadUI() util.ReloadUI()

View file

@ -32,6 +32,7 @@ import (
"github.com/siyuan-note/logging" "github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/conf" "github.com/siyuan-note/siyuan/kernel/conf"
"github.com/siyuan-note/siyuan/kernel/sql" "github.com/siyuan-note/siyuan/kernel/sql"
"github.com/siyuan-note/siyuan/kernel/task"
"github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/treenode"
"github.com/siyuan-note/siyuan/kernel/util" "github.com/siyuan-note/siyuan/kernel/util"
) )
@ -106,6 +107,10 @@ func BootSyncData() {
} }
func SyncData(boot, exit, byHand bool) { func SyncData(boot, exit, byHand bool) {
task.PrependTask(task.CloudSync, syncData, boot, exit, byHand)
}
func syncData(boot, exit, byHand bool) {
defer logging.Recover() defer logging.Recover()
if !boot && !exit && 2 == Conf.Sync.Mode && !byHand { if !boot && !exit && 2 == Conf.Sync.Mode && !byHand {
@ -267,53 +272,35 @@ func SetCloudSyncDir(name string) {
return return
} }
syncLock.Lock()
defer syncLock.Unlock()
Conf.Sync.CloudName = name Conf.Sync.CloudName = name
Conf.Save() Conf.Save()
} }
func SetSyncGenerateConflictDoc(b bool) { func SetSyncGenerateConflictDoc(b bool) {
syncLock.Lock()
defer syncLock.Unlock()
Conf.Sync.GenerateConflictDoc = b Conf.Sync.GenerateConflictDoc = b
Conf.Save() Conf.Save()
return return
} }
func SetSyncEnable(b bool) (err error) { func SetSyncEnable(b bool) (err error) {
syncLock.Lock()
defer syncLock.Unlock()
Conf.Sync.Enabled = b Conf.Sync.Enabled = b
Conf.Save() Conf.Save()
return return
} }
func SetSyncMode(mode int) (err error) { func SetSyncMode(mode int) (err error) {
syncLock.Lock()
defer syncLock.Unlock()
Conf.Sync.Mode = mode Conf.Sync.Mode = mode
Conf.Save() Conf.Save()
return return
} }
func SetSyncProvider(provider int) (err error) { func SetSyncProvider(provider int) (err error) {
syncLock.Lock()
defer syncLock.Unlock()
Conf.Sync.Provider = provider Conf.Sync.Provider = provider
Conf.Save() Conf.Save()
return return
} }
func SetSyncProviderS3(s3 *conf.S3) (err error) { func SetSyncProviderS3(s3 *conf.S3) (err error) {
syncLock.Lock()
defer syncLock.Unlock()
s3.Endpoint = strings.TrimSpace(s3.Endpoint) s3.Endpoint = strings.TrimSpace(s3.Endpoint)
s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint) s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint)
s3.AccessKey = strings.TrimSpace(s3.AccessKey) s3.AccessKey = strings.TrimSpace(s3.AccessKey)
@ -328,9 +315,6 @@ func SetSyncProviderS3(s3 *conf.S3) (err error) {
} }
func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) { func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) {
syncLock.Lock()
defer syncLock.Unlock()
webdav.Endpoint = strings.TrimSpace(webdav.Endpoint) webdav.Endpoint = strings.TrimSpace(webdav.Endpoint)
webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint) webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint)
webdav.Username = strings.TrimSpace(webdav.Username) webdav.Username = strings.TrimSpace(webdav.Username)
@ -350,9 +334,6 @@ func CreateCloudSyncDir(name string) (err error) {
return return
} }
syncLock.Lock()
defer syncLock.Unlock()
name = strings.TrimSpace(name) name = strings.TrimSpace(name)
name = gulu.Str.RemoveInvisible(name) name = gulu.Str.RemoveInvisible(name)
if !cloud.IsValidCloudDirName(name) { if !cloud.IsValidCloudDirName(name) {
@ -380,9 +361,6 @@ func RemoveCloudSyncDir(name string) (err error) {
msgId := util.PushMsg(Conf.Language(116), 15000) msgId := util.PushMsg(Conf.Language(116), 15000)
syncLock.Lock()
defer syncLock.Unlock()
if "" == name { if "" == name {
return return
} }

162
kernel/task/queue.go Normal file
View file

@ -0,0 +1,162 @@
// SiYuan - Build Your Eternal Digital Garden
// Copyright (c) 2020-present, b3log.org
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package task
import (
"reflect"
"sync"
"time"
"github.com/siyuan-note/logging"
)
var (
taskQueue []*Task
taskQueueStatus int
queueLock = sync.Mutex{}
taskLock = sync.Mutex{}
)
const (
QueueStatusRunning = iota
QueueStatusClosing
)
type Task struct {
Action string
Handler reflect.Value
Args []interface{}
Created time.Time
}
func PrependTask(action string, handler interface{}, args ...interface{}) {
queueLock.Lock()
defer queueLock.Unlock()
if QueueStatusRunning != taskQueueStatus {
logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
return
}
cancelTask(action)
taskQueue = append([]*Task{newTask(action, handler, args...)}, taskQueue...)
}
func AppendTask(action string, handler interface{}, args ...interface{}) {
queueLock.Lock()
defer queueLock.Unlock()
if QueueStatusRunning != taskQueueStatus {
logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
return
}
cancelTask(action)
taskQueue = append(taskQueue, newTask(action, handler, args...))
}
func CancelTask(actions ...string) {
queueLock.Lock()
defer queueLock.Unlock()
cancelTask(actions...)
}
func cancelTask(actions ...string) {
for i := len(taskQueue) - 1; i >= 0; i-- {
task := taskQueue[i]
for _, action := range actions {
if action == task.Action {
taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
break
}
}
}
}
func newTask(action string, handler interface{}, args ...interface{}) *Task {
return &Task{
Action: action,
Handler: reflect.ValueOf(handler),
Args: args,
Created: time.Now(),
}
}
const (
CloudSync = "task.cloud.sync" // 数据同步
RepoCheckout = "task.repo.checkout" // 从快照中检出
DatabaseIndexFull = "task.database.index.full" // 重建索引
DatabaseIndex = "task.database.index" // 数据库所以队列
DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
OCRImage = "task.ocr.image" // 图片 OCR 提取文本
DatabaseIndexEmbedBlock = "task.database.index.embedblock" // 数据库索引嵌入块
)
func Loop() {
for {
time.Sleep(10 * time.Millisecond)
task := popTask()
if nil == task {
continue
}
execTask(task)
}
}
func CloseWait() {
queueLock.Lock()
defer queueLock.Unlock()
taskQueueStatus = QueueStatusClosing
for {
time.Sleep(10 * time.Millisecond)
if 1 > len(taskQueue) {
break
}
}
}
func popTask() (ret *Task) {
queueLock.Lock()
defer queueLock.Unlock()
if 0 == len(taskQueue) {
return
}
ret = taskQueue[0]
taskQueue = taskQueue[1:]
return
}
func execTask(task *Task) {
taskLock.Lock()
defer taskLock.Unlock()
defer logging.Recover()
args := make([]reflect.Value, len(task.Args))
for i, v := range task.Args {
if nil == v {
args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
} else {
args[i] = reflect.ValueOf(v)
}
}
task.Handler.Call(args)
}