diff --git a/kernel/sql/queue.go b/kernel/sql/queue.go index df9cfe9f2..c8528f850 100644 --- a/kernel/sql/queue.go +++ b/kernel/sql/queue.go @@ -128,11 +128,15 @@ func FlushQueue() { return } - if 16 < i && 0 == i%256 { + if 16 < i && 0 == i%128 { runtime.GC() } } + if 128 < len(ops) { + runtime.GC() + } + elapsed := time.Now().Sub(start).Milliseconds() if 5000 < elapsed { logging.LogInfof("op tx [%dms]", elapsed) diff --git a/kernel/treenode/blocktree.go b/kernel/treenode/blocktree.go index 4cddf9217..ff18cf3e4 100644 --- a/kernel/treenode/blocktree.go +++ b/kernel/treenode/blocktree.go @@ -488,10 +488,15 @@ func SaveBlockTree(force bool) { os.MkdirAll(util.BlockTreePath, 0755) size := uint64(0) - blockTrees.Range(func(key, value interface{}) bool { - slice := value.(*btSlice) + poolSize := runtime.NumCPU() + waitGroup := &sync.WaitGroup{} + p, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) { + defer waitGroup.Done() + + key := arg.(map[string]interface{})["key"].(string) + slice := arg.(map[string]interface{})["value"].(*btSlice) if !force && (slice.changed.IsZero() || slice.changed.After(start.Add(-7*time.Second))) { - return true + return } slice.m.Lock() @@ -499,24 +504,37 @@ func SaveBlockTree(force bool) { if nil != err { logging.LogErrorf("marshal block tree failed: %s", err) os.Exit(util.ExitCodeBlockTreeErr) - return false + return } slice.m.Unlock() - p := filepath.Join(util.BlockTreePath, key.(string)) + ".msgpack" + p := filepath.Join(util.BlockTreePath, key) + ".msgpack" if err = gulu.File.WriteFileSafer(p, data, 0644); nil != err { logging.LogErrorf("write block tree failed: %s", err) os.Exit(util.ExitCodeBlockTreeErr) - return false + return } slice.changed = time.Time{} size += uint64(len(data)) + }) + + blockTrees.Range(func(key, value interface{}) bool { + slice := value.(*btSlice) + if !force && (slice.changed.IsZero() || slice.changed.After(start.Add(-7*time.Second))) { + return true + } + + waitGroup.Add(1) + p.Invoke(map[string]interface{}{"key": key, "value": value}) return true }) - runtime.GC() + waitGroup.Wait() + p.Release() - if elapsed := time.Since(start).Seconds(); 2 < elapsed { + runtime.GC() + elapsed := time.Since(start).Seconds() + if 2 < elapsed { logging.LogWarnf("save block tree [size=%s] to [%s], elapsed [%.2fs]", humanize.Bytes(size), util.BlockTreePath, elapsed) } }