diff --git a/kernel/model/conf.go b/kernel/model/conf.go index b48f31750..38baad500 100644 --- a/kernel/model/conf.go +++ b/kernel/model/conf.go @@ -469,7 +469,7 @@ func Close(force bool, execInstallPkg int) (exitCode int) { if !force { if Conf.Sync.Enabled && 3 != Conf.Sync.Mode && ((IsSubscriber() && conf.ProviderSiYuan == Conf.Sync.Provider) || conf.ProviderSiYuan != Conf.Sync.Provider) { - syncData(true, false) + syncData(true, false, false) if 0 != ExitSyncSucc { exitCode = 1 return @@ -509,6 +509,7 @@ func Close(force bool, execInstallPkg int) (exitCode int) { time.Sleep(4 * time.Second) } logging.LogInfof("exited kernel") + webSocketConn.Close() util.WebSocketServer.Close() go func() { time.Sleep(500 * time.Millisecond) diff --git a/kernel/model/sync.go b/kernel/model/sync.go index 408138b93..f42f4cd68 100644 --- a/kernel/model/sync.go +++ b/kernel/model/sync.go @@ -23,6 +23,7 @@ import ( "os" "path" "path/filepath" + "runtime" "strings" "sync" "time" @@ -176,10 +177,10 @@ func BootSyncData() { } func SyncData(byHand bool) { - syncData(false, byHand) + syncData(false, byHand, false) } -func syncData(exit, byHand bool) { +func syncData(exit, byHand, byWebSocket bool) { defer logging.Recover() if !checkSync(false, exit, byHand) { @@ -219,6 +220,23 @@ func syncData(exit, byHand bool) { code = 2 } util.BroadcastByType("main", "syncing", code, msg, nil) + + if nil == webSocketConn { + // 如果 websocket 连接已经断开,则重新连接 + connectSyncWebSocket() + } + + if 1 == Conf.Sync.Mode && !byWebSocket { + // 如果处于自动同步模式且不是又 WS 触发的同步,则通知其他设备上的内核进行同步 + request := map[string]interface{}{ + "cmd": "synced", + "synced": Conf.Sync.Synced, + } + if writeErr := webSocketConn.WriteJSON(request); nil != writeErr { + logging.LogErrorf("write websocket message failed: %v", writeErr) + } + } + return } @@ -600,6 +618,23 @@ func isProviderOnline(byHand bool) (ret bool) { return } +var ( + webSocketConn *websocket.Conn + webSocketConnLock = sync.Mutex{} +) + +type OnlineKernel struct { + ID string `json:"id"` + Hostname string `json:"hostname"` + OS string `json:"os"` + Ver string `json:"ver"` +} + +var ( + onlineKernels []*OnlineKernel + onlineKernelsLock = sync.Mutex{} +) + func connectSyncWebSocket() { defer logging.Recover() @@ -607,19 +642,27 @@ func connectSyncWebSocket() { return } + webSocketConnLock.Lock() + defer webSocketConnLock.Unlock() + + if nil != webSocketConn { + return + } + if "1602224134353" != Conf.User.UserId { return } logging.LogInfof("connecting sync websocket...") - c, dialErr := dialSyncWebSocket() + var dialErr error + webSocketConn, dialErr = dialSyncWebSocket() if nil != dialErr { logging.LogWarnf("connect sync websocket failed: %s", dialErr) return } logging.LogInfof("sync websocket connected") - c.SetCloseHandler(func(code int, text string) error { + webSocketConn.SetCloseHandler(func(code int, text string) error { logging.LogWarnf("sync websocket closed: %d, %s", code, text) return nil }) @@ -628,13 +671,13 @@ func connectSyncWebSocket() { defer logging.Recover() for { - result := map[string]interface{}{} - if readErr := c.ReadJSON(&result); nil != readErr { + result := gulu.Ret.NewResult() + if readErr := webSocketConn.ReadJSON(&result); nil != readErr { reconnected := false for retries := 0; retries < 7; retries++ { time.Sleep(7 * time.Second) logging.LogWarnf("reconnecting sync websocket...") - c, dialErr = dialSyncWebSocket() + webSocketConn, dialErr = dialSyncWebSocket() if nil != dialErr { logging.LogWarnf("reconnect sync websocket failed: %s", dialErr) continue @@ -646,6 +689,7 @@ func connectSyncWebSocket() { } if !reconnected { logging.LogWarnf("reconnect sync websocket failed, do not retry") + webSocketConn = nil return } @@ -653,6 +697,26 @@ func connectSyncWebSocket() { } logging.LogInfof("sync websocket message: %v", result) + data := result.Data.(map[string]interface{}) + switch data["cmd"].(string) { + case "synced": + syncData(false, false, true) + case "kernels": + onlineKernelsLock.Lock() + + onlineKernels = []*OnlineKernel{} + for _, kernel := range data["kernels"].([]interface{}) { + kernelMap := kernel.(map[string]interface{}) + onlineKernels = append(onlineKernels, &OnlineKernel{ + ID: kernelMap["id"].(string), + Hostname: kernelMap["hostname"].(string), + OS: kernelMap["os"].(string), + Ver: kernelMap["ver"].(string), + }) + } + + onlineKernelsLock.Unlock() + } } }() @@ -673,14 +737,18 @@ func connectSyncWebSocket() { }() } +var kernelID = gulu.Rand.String(7) + func dialSyncWebSocket() (c *websocket.Conn, err error) { - path := "/apis/siyuan/dejavu/ws" - endpoint := util.AliyunWebSocketServer + path - //endpoint := "ws://127.0.0.1:64388" + path + //endpoint := "ws://127.0.0.1:64388" + "/apis/siyuan/dejavu/ws" + endpoint := util.AliyunWebSocketServer + "/apis/siyuan/dejavu/ws" header := http.Header{ - "x-siyuan-uid": []string{Conf.User.UserId}, - "x-siyuan-kernel": []string{gulu.Rand.String(7)}, - "x-siyuan-ver": []string{util.Ver}, + "x-siyuan-uid": []string{Conf.User.UserId}, + "x-siyuan-kernel": []string{kernelID}, + "x-siyuan-ver": []string{util.Ver}, + "x-siyuan-os": []string{runtime.GOOS}, + "x-siyuan-hostname": []string{util.GetDeviceName()}, + "x-siyuan-repo": []string{Conf.Sync.CloudName}, } c, _, err = websocket.DefaultDialer.Dial(endpoint, header) return