diff --git a/kernel/api/repo.go b/kernel/api/repo.go index 6fe7f08d2..26d18ac84 100644 --- a/kernel/api/repo.go +++ b/kernel/api/repo.go @@ -113,11 +113,7 @@ func checkoutRepo(c *gin.Context) { } id := arg["id"].(string) - if err := model.CheckoutRepo(id); nil != err { - ret.Code = -1 - ret.Msg = model.Conf.Language(141) - return - } + model.CheckoutRepo(id) } func downloadCloudSnapshot(c *gin.Context) { diff --git a/kernel/main.go b/kernel/main.go index ff84ace50..426622bd8 100644 --- a/kernel/main.go +++ b/kernel/main.go @@ -23,6 +23,7 @@ import ( "github.com/siyuan-note/siyuan/kernel/model" "github.com/siyuan-note/siyuan/kernel/server" "github.com/siyuan-note/siyuan/kernel/sql" + "github.com/siyuan-note/siyuan/kernel/task" "github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/util" ) @@ -42,6 +43,8 @@ func main() { model.LoadFlashcards() model.LoadAssetsTexts() + go task.Loop() + go model.AutoGenerateDocHistory() go model.AutoSync() go model.AutoStat() @@ -49,7 +52,7 @@ func main() { util.PushClearAllMsg() go model.AutoRefreshCheck() go model.AutoFlushTx() - go sql.AutoFlushTreeQueue() + go sql.AutoFlushQueue() go treenode.AutoFlushBlockTree() go cache.LoadAssets() go model.AutoFixIndex() diff --git a/kernel/mobile/kernel.go b/kernel/mobile/kernel.go index a5feb8438..67b339a1d 100644 --- a/kernel/mobile/kernel.go +++ b/kernel/mobile/kernel.go @@ -27,6 +27,7 @@ import ( "github.com/siyuan-note/siyuan/kernel/model" "github.com/siyuan-note/siyuan/kernel/server" "github.com/siyuan-note/siyuan/kernel/sql" + "github.com/siyuan-note/siyuan/kernel/task" "github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/util" _ "golang.org/x/mobile/bind" @@ -56,6 +57,8 @@ func StartKernel(container, appDir, workspaceBaseDir, timezoneID, localIPs, lang model.LoadFlashcards() model.LoadAssetsTexts() + go task.Loop() + go model.AutoGenerateDocHistory() go model.AutoSync() go model.AutoStat() @@ -63,7 +66,7 @@ func StartKernel(container, appDir, workspaceBaseDir, timezoneID, localIPs, lang util.PushClearAllMsg() go model.AutoRefreshCheck() go model.AutoFlushTx() - go sql.AutoFlushTreeQueue() + go sql.AutoFlushQueue() go treenode.AutoFlushBlockTree() go cache.LoadAssets() go model.AutoFixIndex() diff --git a/kernel/model/box.go b/kernel/model/box.go index a90fb5677..02ad30924 100644 --- a/kernel/model/box.go +++ b/kernel/model/box.go @@ -39,6 +39,7 @@ import ( "github.com/siyuan-note/logging" "github.com/siyuan-note/siyuan/kernel/conf" "github.com/siyuan-note/siyuan/kernel/sql" + "github.com/siyuan-note/siyuan/kernel/task" "github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/util" ) @@ -504,10 +505,11 @@ func genTreeID(tree *parse.Tree) { return } -var isFullReindexing = false - func FullReindex() { - isFullReindexing = true + task.PrependTask(task.DatabaseIndexFull, fullReindex) +} + +func fullReindex() { util.PushEndlessProgress(Conf.Language(35)) WaitForWritingFiles() @@ -526,7 +528,6 @@ func FullReindex() { LoadFlashcards() util.PushEndlessProgress(Conf.Language(58)) - isFullReindexing = false go func() { time.Sleep(1 * time.Second) util.ReloadUI() diff --git a/kernel/model/sync.go b/kernel/model/sync.go index 67fd8e43d..aa67f5966 100644 --- a/kernel/model/sync.go +++ b/kernel/model/sync.go @@ -32,6 +32,7 @@ import ( "github.com/siyuan-note/logging" "github.com/siyuan-note/siyuan/kernel/conf" "github.com/siyuan-note/siyuan/kernel/sql" + "github.com/siyuan-note/siyuan/kernel/task" "github.com/siyuan-note/siyuan/kernel/treenode" "github.com/siyuan-note/siyuan/kernel/util" ) @@ -106,6 +107,10 @@ func BootSyncData() { } func SyncData(boot, exit, byHand bool) { + task.PrependTask(task.CloudSync, syncData, boot, exit, byHand) +} + +func syncData(boot, exit, byHand bool) { defer logging.Recover() if !boot && !exit && 2 == Conf.Sync.Mode && !byHand { @@ -267,53 +272,35 @@ func SetCloudSyncDir(name string) { return } - syncLock.Lock() - defer syncLock.Unlock() - Conf.Sync.CloudName = name Conf.Save() } func SetSyncGenerateConflictDoc(b bool) { - syncLock.Lock() - defer syncLock.Unlock() - Conf.Sync.GenerateConflictDoc = b Conf.Save() return } func SetSyncEnable(b bool) (err error) { - syncLock.Lock() - defer syncLock.Unlock() - Conf.Sync.Enabled = b Conf.Save() return } func SetSyncMode(mode int) (err error) { - syncLock.Lock() - defer syncLock.Unlock() - Conf.Sync.Mode = mode Conf.Save() return } func SetSyncProvider(provider int) (err error) { - syncLock.Lock() - defer syncLock.Unlock() - Conf.Sync.Provider = provider Conf.Save() return } func SetSyncProviderS3(s3 *conf.S3) (err error) { - syncLock.Lock() - defer syncLock.Unlock() - s3.Endpoint = strings.TrimSpace(s3.Endpoint) s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint) s3.AccessKey = strings.TrimSpace(s3.AccessKey) @@ -328,9 +315,6 @@ func SetSyncProviderS3(s3 *conf.S3) (err error) { } func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) { - syncLock.Lock() - defer syncLock.Unlock() - webdav.Endpoint = strings.TrimSpace(webdav.Endpoint) webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint) webdav.Username = strings.TrimSpace(webdav.Username) @@ -350,9 +334,6 @@ func CreateCloudSyncDir(name string) (err error) { return } - syncLock.Lock() - defer syncLock.Unlock() - name = strings.TrimSpace(name) name = gulu.Str.RemoveInvisible(name) if !cloud.IsValidCloudDirName(name) { @@ -380,9 +361,6 @@ func RemoveCloudSyncDir(name string) (err error) { msgId := util.PushMsg(Conf.Language(116), 15000) - syncLock.Lock() - defer syncLock.Unlock() - if "" == name { return } diff --git a/kernel/task/queue.go b/kernel/task/queue.go new file mode 100644 index 000000000..e5649ebd3 --- /dev/null +++ b/kernel/task/queue.go @@ -0,0 +1,162 @@ +// SiYuan - Build Your Eternal Digital Garden +// Copyright (c) 2020-present, b3log.org +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package task + +import ( + "reflect" + "sync" + "time" + + "github.com/siyuan-note/logging" +) + +var ( + taskQueue []*Task + taskQueueStatus int + queueLock = sync.Mutex{} + taskLock = sync.Mutex{} +) + +const ( + QueueStatusRunning = iota + QueueStatusClosing +) + +type Task struct { + Action string + Handler reflect.Value + Args []interface{} + Created time.Time +} + +func PrependTask(action string, handler interface{}, args ...interface{}) { + queueLock.Lock() + defer queueLock.Unlock() + + if QueueStatusRunning != taskQueueStatus { + logging.LogWarnf("task queue is paused, action [%s] will be ignored", action) + return + } + + cancelTask(action) + taskQueue = append([]*Task{newTask(action, handler, args...)}, taskQueue...) +} + +func AppendTask(action string, handler interface{}, args ...interface{}) { + queueLock.Lock() + defer queueLock.Unlock() + + if QueueStatusRunning != taskQueueStatus { + logging.LogWarnf("task queue is paused, action [%s] will be ignored", action) + return + } + + cancelTask(action) + taskQueue = append(taskQueue, newTask(action, handler, args...)) +} + +func CancelTask(actions ...string) { + queueLock.Lock() + defer queueLock.Unlock() + + cancelTask(actions...) +} + +func cancelTask(actions ...string) { + for i := len(taskQueue) - 1; i >= 0; i-- { + task := taskQueue[i] + for _, action := range actions { + if action == task.Action { + taskQueue = append(taskQueue[:i], taskQueue[i+1:]...) + break + } + } + } +} + +func newTask(action string, handler interface{}, args ...interface{}) *Task { + return &Task{ + Action: action, + Handler: reflect.ValueOf(handler), + Args: args, + Created: time.Now(), + } +} + +const ( + CloudSync = "task.cloud.sync" // 数据同步 + RepoCheckout = "task.repo.checkout" // 从快照中检出 + DatabaseIndexFull = "task.database.index.full" // 重建索引 + DatabaseIndex = "task.database.index" // 数据库所以队列 + DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正 + OCRImage = "task.ocr.image" // 图片 OCR 提取文本 + DatabaseIndexEmbedBlock = "task.database.index.embedblock" // 数据库索引嵌入块 +) + +func Loop() { + for { + time.Sleep(10 * time.Millisecond) + task := popTask() + if nil == task { + continue + } + + execTask(task) + } +} + +func CloseWait() { + queueLock.Lock() + defer queueLock.Unlock() + + taskQueueStatus = QueueStatusClosing + for { + time.Sleep(10 * time.Millisecond) + if 1 > len(taskQueue) { + break + } + } +} + +func popTask() (ret *Task) { + queueLock.Lock() + defer queueLock.Unlock() + + if 0 == len(taskQueue) { + return + } + + ret = taskQueue[0] + taskQueue = taskQueue[1:] + return +} + +func execTask(task *Task) { + taskLock.Lock() + defer taskLock.Unlock() + defer logging.Recover() + + args := make([]reflect.Value, len(task.Args)) + for i, v := range task.Args { + if nil == v { + args[i] = reflect.New(task.Handler.Type().In(i)).Elem() + } else { + args[i] = reflect.ValueOf(v) + } + } + task.Handler.Call(args) +}