优化云端同步上传资源占用和耗时 https://github.com/siyuan-note/siyuan/issues/5093

This commit is contained in:
Liang Ding 2022-06-05 16:27:01 +08:00
parent 27c80da76d
commit ed82886eb0
No known key found for this signature in database
GPG key ID: 136F30F901A2231D
3 changed files with 64 additions and 140 deletions

View file

@ -278,7 +278,7 @@ func CreateLocalBackup() (err error) {
return return
} }
err = genFullCloudIndex(newBackupDir, map[string]bool{}) _, err = genCloudIndex(newBackupDir, map[string]bool{})
if nil != err { if nil != err {
return return
} }

View file

@ -325,30 +325,15 @@ func ossUpload(localDirPath, cloudDirPath, cloudDevice string, boot bool, remove
return return
} }
var cloudFileList map[string]*CloudIndex
localDevice := Conf.System.ID localDevice := Conf.System.ID
excludes := getSyncExcludedList(localDirPath)
syncIgnoreList := getSyncIgnoreList() localFileList, genIndexErr := genCloudIndex(localDirPath, excludes)
excludes := map[string]bool{} var localUpserts, cloudRemoves []string
ignores := syncIgnoreList.Values() var cloudFileList map[string]*CloudIndex
for _, p := range ignores {
relPath := p.(string)
relPath = pathSha256Short(relPath, "/")
relPath = filepath.Join(localDirPath, relPath)
excludes[relPath] = true
}
localFileList, getLocalFileListErr := getLocalFileListOSS(cloudDirPath, excludes)
if "" != localDevice && localDevice == cloudDevice { if "" != localDevice && localDevice == cloudDevice {
//util.LogInfof("cloud device is the same as local device, get index from local") //util.LogInfof("cloud device is the same as local device, get index from local")
if nil == getLocalFileListErr { if nil == genIndexErr {
cloudFileList = map[string]*CloudIndex{} localUpserts, cloudRemoves, err = cloudUpsertRemoveLocalListOSS(localDirPath, removedSyncList, upsertedSyncList, excludes)
// 深拷贝一次,避免后面和 localFileList 对比时引用值相同
for p, idx := range localFileList {
cloudFileList[p] = &CloudIndex{
Hash: idx.Hash,
Size: idx.Size,
}
}
} else { } else {
util.LogInfof("get local index failed [%s], get index from cloud", err) util.LogInfof("get local index failed [%s], get index from cloud", err)
cloudFileList, err = getCloudFileListOSS(cloudDirPath) cloudFileList, err = getCloudFileListOSS(cloudDirPath)
@ -360,10 +345,12 @@ func ossUpload(localDirPath, cloudDirPath, cloudDevice string, boot bool, remove
return return
} }
localUpserts, cloudRemoves, err := cloudUpsertRemoveListOSS(localDirPath, cloudFileList, localFileList, removedSyncList, upsertedSyncList, excludes) if 0 < len(cloudFileList) {
localUpserts, cloudRemoves, err = cloudUpsertRemoveListOSS(localDirPath, cloudFileList, localFileList, excludes)
if nil != err { if nil != err {
return return
} }
}
err = ossRemove0(cloudDirPath, cloudRemoves) err = ossRemove0(cloudDirPath, cloudRemoves)
if nil != err { if nil != err {
@ -397,10 +384,9 @@ func ossUpload(localDirPath, cloudDirPath, cloudDevice string, boot bool, remove
util.IncBootProgress(0, msg) util.IncBootProgress(0, msg)
} }
}) })
var index string index := filepath.Join(localDirPath, "index.json")
localIndex := filepath.Join(localDirPath, "index.json")
for _, localUpsert := range localUpserts { for _, localUpsert := range localUpserts {
if localIndex == localUpsert { if index == localUpsert {
// 同步过程中断导致的一致性问题 https://github.com/siyuan-note/siyuan/issues/4912 // 同步过程中断导致的一致性问题 https://github.com/siyuan-note/siyuan/issues/4912
// index 最后单独上传 // index 最后单独上传
index = localUpsert index = localUpsert
@ -461,6 +447,7 @@ func ossRemove0(cloudDirPath string, removes []string) (err error) {
func ossUpload0(localDirPath, cloudDirPath, localUpsert string, wroteFiles *int, transferSize *uint64) (err error) { func ossUpload0(localDirPath, cloudDirPath, localUpsert string, wroteFiles *int, transferSize *uint64) (err error) {
info, statErr := os.Stat(localUpsert) info, statErr := os.Stat(localUpsert)
if nil != statErr { if nil != statErr {
util.LogErrorf("stat file [%s] failed: %s", localUpsert, statErr)
err = statErr err = statErr
return return
} }
@ -608,30 +595,6 @@ func getCloudSync(cloudDir string) (assetSize, backupSize int64, device string,
return return
} }
func getLocalFileListOSS(dirPath string, excludes map[string]bool) (ret map[string]*CloudIndex, err error) {
dir := "sync"
if !strings.HasPrefix(dirPath, "sync") {
dir = "backup"
}
localDirPath := filepath.Join(util.WorkspaceDir, dir)
indexPath := filepath.Join(localDirPath, "index.json")
if !gulu.File.IsExist(indexPath) {
err = genFullCloudIndex(localDirPath, excludes)
if nil != err {
return
}
}
data, err := os.ReadFile(indexPath)
if nil != err {
return
}
err = gulu.JSON.UnmarshalJSON(data, &ret)
return
}
func getCloudFileListOSS(cloudDirPath string) (ret map[string]*CloudIndex, err error) { func getCloudFileListOSS(cloudDirPath string) (ret map[string]*CloudIndex, err error) {
result := map[string]interface{}{} result := map[string]interface{}{}
request := util.NewCloudRequest(Conf.System.NetworkProxy.String()) request := util.NewCloudRequest(Conf.System.NetworkProxy.String())
@ -703,12 +666,8 @@ func localUpsertRemoveListOSS(localDirPath string, cloudFileList map[string]*Clo
return nil return nil
} }
localHash, hashErr := util.GetEtag(path) localModTime := info.ModTime().Unix()
if nil != hashErr { if cloudIdx.Updated == localModTime {
util.LogErrorf("get local file [%s] etag failed: %s", path, hashErr)
return nil
}
if cloudIdx.Hash == localHash {
unchanged[relPath] = true unchanged[relPath] = true
} }
return nil return nil
@ -727,12 +686,35 @@ func localUpsertRemoveListOSS(localDirPath string, cloudFileList map[string]*Clo
return return
} }
func cloudUpsertRemoveListOSS(localDirPath string, cloudFileList, localFileList map[string]*CloudIndex, removedSyncList, upsertedSyncList, excludes map[string]bool) (localUpserts, cloudRemoves []string, err error) { func cloudUpsertRemoveLocalListOSS(localDirPath string, removedSyncList, upsertedSyncList, excludes map[string]bool) (localUpserts, cloudRemoves []string, err error) {
localUpserts, cloudRemoves = []string{}, []string{} localUpserts, cloudRemoves = []string{}, []string{}
if err = incCloudIndex(localDirPath, &localFileList, removedSyncList, upsertedSyncList, excludes); nil != err { for removed, _ := range removedSyncList {
cloudRemoves = append(cloudRemoves, removed)
}
for upsert, _ := range upsertedSyncList {
p := filepath.Join(localDirPath, upsert)
if excludes[p] {
continue
}
info, statErr := os.Stat(p)
if nil != statErr {
util.LogErrorf("stat file [%s] failed: %s", p, statErr)
err = statErr
return return
} }
if util.CloudSingleFileMaxSizeLimit < info.Size() {
util.LogWarnf("file [%s] larger than 100MB, ignore uploading it", p)
continue
}
localUpserts = append(localUpserts, p)
}
return
}
func cloudUpsertRemoveListOSS(localDirPath string, cloudFileList, localFileList map[string]*CloudIndex, excludes map[string]bool) (localUpserts, cloudRemoves []string, err error) {
localUpserts, cloudRemoves = []string{}, []string{}
unchanged := map[string]bool{} unchanged := map[string]bool{}
for cloudFile, cloudIdx := range cloudFileList { for cloudFile, cloudIdx := range cloudFileList {
@ -741,7 +723,7 @@ func cloudUpsertRemoveListOSS(localDirPath string, cloudFileList, localFileList
cloudRemoves = append(cloudRemoves, cloudFile) cloudRemoves = append(cloudRemoves, cloudFile)
continue continue
} }
if localIdx.Hash == cloudIdx.Hash { if localIdx.Updated == cloudIdx.Updated {
unchanged[filepath.Join(localDirPath, cloudFile)] = true unchanged[filepath.Join(localDirPath, cloudFile)] = true
} }
} }
@ -765,32 +747,6 @@ func cloudUpsertRemoveListOSS(localDirPath string, cloudFileList, localFileList
} }
return nil return nil
}) })
// syncignore 变更以后 cloud 和 local index 不一致,需要重新补全 local index
for _, upsert := range localUpserts {
info, statErr := os.Stat(upsert)
if nil != statErr {
util.LogErrorf("stat file [%s] failed: %s", upsert, statErr)
err = statErr
return
}
hash, hashErr := util.GetEtag(upsert)
if nil != hashErr {
util.LogErrorf("get file [%s] hash failed: %s", upsert, hashErr)
err = hashErr
return
}
localFileList[filepath.ToSlash(strings.TrimPrefix(upsert, localDirPath))] = &CloudIndex{Hash: hash, Size: info.Size()}
}
data, err := gulu.JSON.MarshalJSON(localFileList)
if nil != err {
util.LogErrorf("marshal sync cloud index failed: %s", err)
return
}
if err = os.WriteFile(filepath.Join(localDirPath, "index.json"), data, 0644); nil != err {
util.LogErrorf("write sync cloud index failed: %s", err)
return
}
return return
} }

View file

@ -619,13 +619,13 @@ func workspaceData2SyncDir() (removedSyncList, upsertedSyncList map[string]bool,
} }
type CloudIndex struct { type CloudIndex struct {
Hash string `json:"hash"`
Size int64 `json:"size"` Size int64 `json:"size"`
Updated int64 `json:"updated"` // Unix timestamp 秒
} }
// genCloudIndex 全量生成云端索引文件。 // genCloudIndex 生成云端索引文件。
func genFullCloudIndex(localDirPath string, excludes map[string]bool) (err error) { func genCloudIndex(localDirPath string, excludes map[string]bool) (cloudIndex map[string]*CloudIndex, err error) {
cloudIndex := map[string]*CloudIndex{} cloudIndex = map[string]*CloudIndex{}
err = filepath.Walk(localDirPath, func(path string, info fs.FileInfo, err error) error { err = filepath.Walk(localDirPath, func(path string, info fs.FileInfo, err error) error {
if nil != err { if nil != err {
return err return err
@ -638,15 +638,9 @@ func genFullCloudIndex(localDirPath string, excludes map[string]bool) (err error
return nil return nil
} }
hash, hashErr := util.GetEtag(path)
if nil != hashErr {
util.LogErrorf("get file [%s] hash failed: %s", path, hashErr)
return hashErr
}
p := strings.TrimPrefix(path, localDirPath) p := strings.TrimPrefix(path, localDirPath)
p = filepath.ToSlash(p) p = filepath.ToSlash(p)
cloudIndex[p] = &CloudIndex{Hash: hash, Size: info.Size()} cloudIndex[p] = &CloudIndex{Size: info.Size(), Updated: info.ModTime().Unix()}
return nil return nil
}) })
if nil != err { if nil != err {
@ -665,36 +659,6 @@ func genFullCloudIndex(localDirPath string, excludes map[string]bool) (err error
return return
} }
// incCloudIndex 增量生成云端索引文件。
func incCloudIndex(localDirPath string, localFileList *map[string]*CloudIndex, removes, upserts, excludes map[string]bool) (err error) {
for remove, _ := range removes {
delete(*localFileList, remove)
}
for exclude, _ := range excludes {
delete(*localFileList, filepath.ToSlash(strings.TrimPrefix(exclude, localDirPath)))
}
for upsert, _ := range upserts {
path := filepath.Join(localDirPath, upsert)
if excludes[path] {
continue
}
info, statErr := os.Stat(path)
if nil != statErr {
util.LogErrorf("stat file [%s] failed: %s", path, statErr)
return statErr
}
hash, hashErr := util.GetEtag(path)
if nil != hashErr {
util.LogErrorf("get file [%s] hash failed: %s", path, hashErr)
return hashErr
}
(*localFileList)[upsert] = &CloudIndex{Hash: hash, Size: info.Size()}
}
return
}
func recoverSyncData(modified map[string]bool) (decryptedDataDir string, upsertFiles []string, err error) { func recoverSyncData(modified map[string]bool) (decryptedDataDir string, upsertFiles []string, err error) {
passwd := Conf.E2EEPasswd passwd := Conf.E2EEPasswd
decryptedDataDir = filepath.Join(util.WorkspaceDir, "incremental", "sync-decrypt") decryptedDataDir = filepath.Join(util.WorkspaceDir, "incremental", "sync-decrypt")
@ -966,16 +930,7 @@ func calcUnchangedSyncList() (ret map[string]bool, removes []string, err error)
return return
} }
syncIgnoreList := getSyncIgnoreList() excludes := getSyncExcludedList(syncDir)
excludes := map[string]bool{}
ignores := syncIgnoreList.Values()
for _, p := range ignores {
relPath := p.(string)
relPath = pathSha256Short(relPath, "/")
relPath = filepath.Join(syncDir, relPath)
excludes[relPath] = true
}
ret = map[string]bool{} ret = map[string]bool{}
sep := string(os.PathSeparator) sep := string(os.PathSeparator)
filepath.Walk(util.DataDir, func(path string, info fs.FileInfo, _ error) error { filepath.Walk(util.DataDir, func(path string, info fs.FileInfo, _ error) error {
@ -1275,6 +1230,19 @@ func IsValidCloudDirName(cloudDirName string) bool {
return true return true
} }
func getSyncExcludedList(localDirPath string) (ret map[string]bool) {
syncIgnoreList := getSyncIgnoreList()
ret = map[string]bool{}
ignores := syncIgnoreList.Values()
for _, p := range ignores {
relPath := p.(string)
relPath = pathSha256Short(relPath, "/")
relPath = filepath.Join(localDirPath, relPath)
ret[relPath] = true
}
return
}
func getSyncIgnoreList() (ret *hashset.Set) { func getSyncIgnoreList() (ret *hashset.Set) {
ret = hashset.New() ret = hashset.New()
ignore := filepath.Join(util.DataDir, ".siyuan", "syncignore") ignore := filepath.Join(util.DataDir, ".siyuan", "syncignore")