This commit is contained in:
Daniel 2024-04-12 20:33:53 +08:00
parent f822e4d69e
commit ee473289d2
No known key found for this signature in database
GPG key ID: 86211BA83DF03017
3 changed files with 32 additions and 23 deletions

View file

@ -35,6 +35,7 @@ import (
var ( var (
operationQueue []*dbQueueOperation operationQueue []*dbQueueOperation
dbQueueLock = sync.Mutex{} dbQueueLock = sync.Mutex{}
txLock = sync.Mutex{}
) )
type dbQueueOperation struct { type dbQueueOperation struct {
@ -95,29 +96,30 @@ func ClearQueue() {
} }
func FlushQueue() { func FlushQueue() {
dbQueueLock.Lock() ops := getOperations()
defer dbQueueLock.Unlock() total := len(ops)
total := len(operationQueue)
if 1 > total { if 1 > total {
return return
} }
txLock.Lock()
defer txLock.Unlock()
start := time.Now() start := time.Now()
context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
if 512 < total { if 512 < len(ops) {
disableCache() disableCache()
defer enableCache() defer enableCache()
} }
groupOpsTotal := map[string]int{} groupOpsTotal := map[string]int{}
for _, op := range operationQueue { for _, op := range ops {
groupOpsTotal[op.action]++ groupOpsTotal[op.action]++
} }
groupOpsCurrent := map[string]int{} groupOpsCurrent := map[string]int{}
for i, op := range operationQueue { for i, op := range ops {
if util.IsExiting.Load() { if util.IsExiting.Load() {
return return
} }
@ -150,8 +152,6 @@ func FlushQueue() {
debug.FreeOSMemory() debug.FreeOSMemory()
} }
operationQueue = nil
elapsed := time.Now().Sub(start).Milliseconds() elapsed := time.Now().Sub(start).Milliseconds()
if 7000 < elapsed { if 7000 < elapsed {
logging.LogInfof("database op tx [%dms]", elapsed) logging.LogInfof("database op tx [%dms]", elapsed)
@ -418,3 +418,12 @@ func RemoveTreePathQueue(treeBox, treePathPrefix string) {
} }
operationQueue = append(operationQueue, newOp) operationQueue = append(operationQueue, newOp)
} }
func getOperations() (ops []*dbQueueOperation) {
dbQueueLock.Lock()
defer dbQueueLock.Unlock()
ops = operationQueue
operationQueue = nil
return
}

View file

@ -33,8 +33,7 @@ import (
var ( var (
assetContentOperationQueue []*assetContentDBQueueOperation assetContentOperationQueue []*assetContentDBQueueOperation
assetContentDBQueueLock = sync.Mutex{} assetContentDBQueueLock = sync.Mutex{}
assetContentTxLock = sync.Mutex{}
assetContentTxLock = sync.Mutex{}
) )
type assetContentDBQueueOperation struct { type assetContentDBQueueOperation struct {
@ -51,7 +50,8 @@ func FlushAssetContentTxJob() {
func FlushAssetContentQueue() { func FlushAssetContentQueue() {
ops := getAssetContentOperations() ops := getAssetContentOperations()
if 1 > len(ops) { total := len(ops)
if 1 > total {
return return
} }
@ -97,7 +97,7 @@ func FlushAssetContentQueue() {
} }
} }
if 128 < len(ops) { if 128 < total {
debug.FreeOSMemory() debug.FreeOSMemory()
} }
@ -122,24 +122,24 @@ func execAssetContentOp(op *assetContentDBQueueOperation, tx *sql.Tx, context ma
} }
func DeleteAssetContentsByPathQueue(path string) { func DeleteAssetContentsByPathQueue(path string) {
assetContentTxLock.Lock() assetContentDBQueueLock.Lock()
defer assetContentTxLock.Unlock() defer assetContentDBQueueLock.Unlock()
newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "deletePath", path: path} newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "deletePath", path: path}
assetContentOperationQueue = append(assetContentOperationQueue, newOp) assetContentOperationQueue = append(assetContentOperationQueue, newOp)
} }
func IndexAssetContentsQueue(assetContents []*AssetContent) { func IndexAssetContentsQueue(assetContents []*AssetContent) {
assetContentTxLock.Lock() assetContentDBQueueLock.Lock()
defer assetContentTxLock.Unlock() defer assetContentDBQueueLock.Unlock()
newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "index", assetContents: assetContents} newOp := &assetContentDBQueueOperation{inQueueTime: time.Now(), action: "index", assetContents: assetContents}
assetContentOperationQueue = append(assetContentOperationQueue, newOp) assetContentOperationQueue = append(assetContentOperationQueue, newOp)
} }
func getAssetContentOperations() (ops []*assetContentDBQueueOperation) { func getAssetContentOperations() (ops []*assetContentDBQueueOperation) {
assetContentTxLock.Lock() assetContentDBQueueLock.Lock()
defer assetContentTxLock.Unlock() defer assetContentDBQueueLock.Unlock()
ops = assetContentOperationQueue ops = assetContentOperationQueue
assetContentOperationQueue = nil assetContentOperationQueue = nil

View file

@ -33,8 +33,7 @@ import (
var ( var (
historyOperationQueue []*historyDBQueueOperation historyOperationQueue []*historyDBQueueOperation
historyDBQueueLock = sync.Mutex{} historyDBQueueLock = sync.Mutex{}
historyTxLock = sync.Mutex{}
historyTxLock = sync.Mutex{}
) )
type historyDBQueueOperation struct { type historyDBQueueOperation struct {
@ -51,7 +50,8 @@ func FlushHistoryTxJob() {
func FlushHistoryQueue() { func FlushHistoryQueue() {
ops := getHistoryOperations() ops := getHistoryOperations()
if 1 > len(ops) { total := len(ops)
if 1 > total {
return return
} }
@ -97,7 +97,7 @@ func FlushHistoryQueue() {
} }
} }
if 128 < len(ops) { if 128 < total {
debug.FreeOSMemory() debug.FreeOSMemory()
} }