diff --git a/kernel/api/broadcast.go b/kernel/api/broadcast.go index d0534eb72..c01ead58a 100644 --- a/kernel/api/broadcast.go +++ b/kernel/api/broadcast.go @@ -33,10 +33,29 @@ type Channel struct { Count int `json:"count"` } +type PublishMessage struct { + Type string `json:"type"` // "string" | "binary" + Size int `json:"size"` // message size + Filename string `json:"filename"` // empty string for string-message +} + +type PublishResult struct { + Code int `json:"code"` // 0: success + Msg string `json:"msg"` // error message + + Channel Channel `json:"channel"` + Message PublishMessage `json:"message"` +} + var ( BroadcastChannels = sync.Map{} ) +const ( + StringMessageType = "string" + BinaryMessageType = "binary" +) + // broadcast create a broadcast channel WebSocket connection // // @param @@ -136,7 +155,163 @@ func subscribe(c *gin.Context, broadcastChannel *melody.Melody, channel string) } } +// broadcastPublish push multiple binary messages to multiple broadcast channels +// +// @param +// +// MultipartForm: [name] -> [values] +// - name: string // channel name +// - values: +// - string[] // string-messages to the same channel +// - File[] // binary-messages to the same channel +// - filename: string // message key +// +// @returns +// +// { +// code: int, +// msg: string, +// data: { +// results: { +// code: int, // 0: success +// msg: string, // error message +// channel: { +// name: string, // channel name +// count: string, // subscriber count +// }, +// message: { +// type: string, // "string" | "binary" +// size: int, // message size (Bytes) +// filename: string, // empty string for string-message +// }, +// }[], +// }, +// } +func broadcastPublish(c *gin.Context) { + ret := gulu.Ret.NewResult() + defer c.JSON(http.StatusOK, ret) + + results := []*PublishResult{} + + // Multipart form + form, err := c.MultipartForm() + if err != nil { + ret.Code = -2 + ret.Msg = err.Error() + return + } + + // Broadcast string messages + for name, values := range form.Value { + channel := Channel{ + Name: name, + Count: 0, + } + + // Get broadcast channel + _broadcastChannel, exist := BroadcastChannels.Load(name) + var broadcastChannel *melody.Melody + if exist { + broadcastChannel = _broadcastChannel.(*melody.Melody) + channel.Count = broadcastChannel.Len() + } else { + broadcastChannel = nil + channel.Count = 0 + } + + // Broadcast each string message to the same channel + for _, value := range values { + content := []byte(value) + result := &PublishResult{ + Code: 0, + Msg: "", + Channel: channel, + Message: PublishMessage{ + Type: StringMessageType, + Size: len(content), + Filename: "", + }, + } + results = append(results, result) + + if broadcastChannel != nil { + err := broadcastChannel.Broadcast(content) + if err != nil { + logging.LogErrorf("broadcast message failed: %s", err) + result.Code = -1 + result.Msg = err.Error() + continue + } + } + } + } + + // Broadcast binary message + for name, files := range form.File { + channel := Channel{ + Name: name, + Count: 0, + } + + // Get broadcast channel + _broadcastChannel, exist := BroadcastChannels.Load(name) + var broadcastChannel *melody.Melody + if exist { + broadcastChannel = _broadcastChannel.(*melody.Melody) + channel.Count = broadcastChannel.Len() + } else { + broadcastChannel = nil + channel.Count = 0 + } + + // Broadcast each binary message to the same channel + for _, file := range files { + result := &PublishResult{ + Code: 0, + Msg: "", + Channel: channel, + Message: PublishMessage{ + Type: BinaryMessageType, + Size: int(file.Size), + Filename: file.Filename, + }, + } + results = append(results, result) + + if broadcastChannel != nil { + value, err := file.Open() + if err != nil { + logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err) + result.Code = -2 + result.Msg = err.Error() + continue + } + + content := make([]byte, file.Size) + if _, err := value.Read(content); err != nil { + logging.LogErrorf("read multipart form file [%s] failed: %s", file.Filename, err) + result.Code = -3 + result.Msg = err.Error() + continue + } + + if err := broadcastChannel.BroadcastBinary(content); err != nil { + logging.LogErrorf("broadcast binary message failed: %s", err) + result.Code = -1 + result.Msg = err.Error() + continue + } + } + } + } + + ret.Data = map[string]interface{}{ + "results": results, + } +} + // postMessage send string message to a broadcast channel +// // @param // // { diff --git a/kernel/api/router.go b/kernel/api/router.go index 60e943104..c2f4c0d9b 100644 --- a/kernel/api/router.go +++ b/kernel/api/router.go @@ -457,6 +457,7 @@ func ServeAPI(ginServer *gin.Engine) { ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy) ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast) + ginServer.Handle("POST", "/api/broadcast/publish", model.CheckAuth, model.CheckAdminRole, broadcastPublish) ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage) ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels) ginServer.Handle("POST", "/api/broadcast/getChannelInfo", model.CheckAuth, model.CheckAdminRole, getChannelInfo)