♻️ Implement some delayed kernel events using task queues https://github.com/siyuan-note/siyuan/issues/12393

This commit is contained in:
Daniel 2024-09-05 12:10:42 +08:00
parent e55c12037a
commit 9560277d0c
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
7 changed files with 105 additions and 65 deletions

View file

@ -37,50 +37,80 @@ type Task struct {
Handler reflect.Value
Args []interface{}
Created time.Time
Delay time.Duration
Timeout time.Duration
}
func AppendTask(action string, handler interface{}, args ...interface{}) {
AppendTaskWithTimeout(action, 24*time.Hour, handler, args...)
appendTaskWithDelayTimeout(action, 0, 24*time.Hour, handler, args...)
}
func AppendTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
appendTaskWithDelayTimeout(action, delay, 24*time.Hour, handler, args...)
}
func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
appendTaskWithDelayTimeout(action, 0, timeout, handler, args...)
}
func appendTaskWithDelayTimeout(action string, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
if util.IsExiting.Load() {
//logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
return
}
currentActions := getCurrentActions()
if gulu.Str.Contains(action, currentActions) && gulu.Str.Contains(action, uniqueActions) {
//logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
return
task := &Task{
Action: action,
Handler: reflect.ValueOf(handler),
Args: args,
Created: time.Now(),
Delay: delay,
Timeout: timeout,
}
if gulu.Str.Contains(action, uniqueActions) {
if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
//logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
return
}
}
queueLock.Lock()
defer queueLock.Unlock()
taskQueue = append(taskQueue, &Task{
Action: action,
Timeout: timeout,
Handler: reflect.ValueOf(handler),
Args: args,
Created: time.Now(),
})
taskQueue = append(taskQueue, task)
}
func getCurrentActions() (ret []string) {
queueLock.Lock()
func containTask(task *Task, tasks []*Task) bool {
for _, t := range tasks {
if t.Action == task.Action {
if len(t.Args) != len(task.Args) {
return false
}
currentTaskActionLock.Lock()
if "" != currentTaskAction {
ret = append(ret, currentTaskAction)
for i, arg := range t.Args {
if arg != task.Args[i] {
return false
}
}
return true
}
}
currentTaskActionLock.Unlock()
return false
}
func getCurrentTasks() (ret []*Task) {
queueLock.Lock()
defer queueLock.Unlock()
currentTaskLock.Lock()
if nil != currentTask {
ret = append(ret, currentTask)
}
currentTaskLock.Unlock()
for _, task := range taskQueue {
ret = append(ret, task.Action)
ret = append(ret, task)
}
queueLock.Unlock()
return
}
@ -100,6 +130,7 @@ const (
AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
)
// uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
@ -116,9 +147,9 @@ var uniqueActions = []string{
}
func ContainIndexTask() bool {
actions := getCurrentActions()
for _, action := range actions {
if gulu.Str.Contains(action, []string{DatabaseIndexFull, DatabaseIndex}) {
tasks := getCurrentTasks()
for _, task := range tasks {
if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
return true
}
}
@ -150,15 +181,13 @@ func StatusJob() {
}
defer queueLock.Unlock()
currentTaskActionLock.Lock()
if "" != currentTaskAction {
if nil != actionLangs {
if label := actionLangs[currentTaskAction]; nil != label {
items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
}
currentTaskLock.Lock()
if nil != currentTask && nil != actionLangs {
if label := actionLangs[currentTask.Action]; nil != label {
items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
}
}
currentTaskActionLock.Unlock()
currentTaskLock.Unlock()
if 1 > len(items) {
items = []map[string]interface{}{}
@ -189,14 +218,19 @@ func popTask() (ret *Task) {
return
}
ret = taskQueue[0]
taskQueue = taskQueue[1:]
for i, task := range taskQueue {
if time.Since(task.Created) > task.Delay {
ret = task
taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
return
}
}
return
}
var (
currentTaskAction string
currentTaskActionLock = sync.Mutex{}
currentTask *Task
currentTaskLock = sync.Mutex{}
)
func execTask(task *Task) {
@ -211,9 +245,9 @@ func execTask(task *Task) {
}
}
currentTaskActionLock.Lock()
currentTaskAction = task.Action
currentTaskActionLock.Unlock()
currentTaskLock.Lock()
currentTask = task
currentTaskLock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
defer cancel()
@ -230,7 +264,7 @@ func execTask(task *Task) {
//logging.LogInfof("task [%s] done", task.Action)
}
currentTaskActionLock.Lock()
currentTaskAction = ""
currentTaskActionLock.Unlock()
currentTaskLock.Lock()
currentTask = nil
currentTaskLock.Unlock()
}