mirror of
https://github.com/siyuan-note/siyuan.git
synced 2025-12-20 16:40:13 +01:00
🧑💻 Add Kernel API for publish binary message broadcast (#13681)
* 🎨 Broadcast supports publishing binary messages * 🎨 Add size field to `PublishMessage`
This commit is contained in:
parent
adc819973b
commit
3807f8386c
2 changed files with 176 additions and 0 deletions
|
|
@ -33,10 +33,29 @@ type Channel struct {
|
||||||
Count int `json:"count"`
|
Count int `json:"count"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PublishMessage struct {
|
||||||
|
Type string `json:"type"` // "string" | "binary"
|
||||||
|
Size int `json:"size"` // message size
|
||||||
|
Filename string `json:"filename"` // empty string for string-message
|
||||||
|
}
|
||||||
|
|
||||||
|
type PublishResult struct {
|
||||||
|
Code int `json:"code"` // 0: success
|
||||||
|
Msg string `json:"msg"` // error message
|
||||||
|
|
||||||
|
Channel Channel `json:"channel"`
|
||||||
|
Message PublishMessage `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
BroadcastChannels = sync.Map{}
|
BroadcastChannels = sync.Map{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
StringMessageType = "string"
|
||||||
|
BinaryMessageType = "binary"
|
||||||
|
)
|
||||||
|
|
||||||
// broadcast create a broadcast channel WebSocket connection
|
// broadcast create a broadcast channel WebSocket connection
|
||||||
//
|
//
|
||||||
// @param
|
// @param
|
||||||
|
|
@ -136,7 +155,163 @@ func subscribe(c *gin.Context, broadcastChannel *melody.Melody, channel string)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// broadcastPublish push multiple binary messages to multiple broadcast channels
|
||||||
|
//
|
||||||
|
// @param
|
||||||
|
//
|
||||||
|
// MultipartForm: [name] -> [values]
|
||||||
|
// - name: string // channel name
|
||||||
|
// - values:
|
||||||
|
// - string[] // string-messages to the same channel
|
||||||
|
// - File[] // binary-messages to the same channel
|
||||||
|
// - filename: string // message key
|
||||||
|
//
|
||||||
|
// @returns
|
||||||
|
//
|
||||||
|
// {
|
||||||
|
// code: int,
|
||||||
|
// msg: string,
|
||||||
|
// data: {
|
||||||
|
// results: {
|
||||||
|
// code: int, // 0: success
|
||||||
|
// msg: string, // error message
|
||||||
|
// channel: {
|
||||||
|
// name: string, // channel name
|
||||||
|
// count: string, // subscriber count
|
||||||
|
// },
|
||||||
|
// message: {
|
||||||
|
// type: string, // "string" | "binary"
|
||||||
|
// size: int, // message size (Bytes)
|
||||||
|
// filename: string, // empty string for string-message
|
||||||
|
// },
|
||||||
|
// }[],
|
||||||
|
// },
|
||||||
|
// }
|
||||||
|
func broadcastPublish(c *gin.Context) {
|
||||||
|
ret := gulu.Ret.NewResult()
|
||||||
|
defer c.JSON(http.StatusOK, ret)
|
||||||
|
|
||||||
|
results := []*PublishResult{}
|
||||||
|
|
||||||
|
// Multipart form
|
||||||
|
form, err := c.MultipartForm()
|
||||||
|
if err != nil {
|
||||||
|
ret.Code = -2
|
||||||
|
ret.Msg = err.Error()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast string messages
|
||||||
|
for name, values := range form.Value {
|
||||||
|
channel := Channel{
|
||||||
|
Name: name,
|
||||||
|
Count: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get broadcast channel
|
||||||
|
_broadcastChannel, exist := BroadcastChannels.Load(name)
|
||||||
|
var broadcastChannel *melody.Melody
|
||||||
|
if exist {
|
||||||
|
broadcastChannel = _broadcastChannel.(*melody.Melody)
|
||||||
|
channel.Count = broadcastChannel.Len()
|
||||||
|
} else {
|
||||||
|
broadcastChannel = nil
|
||||||
|
channel.Count = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast each string message to the same channel
|
||||||
|
for _, value := range values {
|
||||||
|
content := []byte(value)
|
||||||
|
result := &PublishResult{
|
||||||
|
Code: 0,
|
||||||
|
Msg: "",
|
||||||
|
Channel: channel,
|
||||||
|
Message: PublishMessage{
|
||||||
|
Type: StringMessageType,
|
||||||
|
Size: len(content),
|
||||||
|
Filename: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
results = append(results, result)
|
||||||
|
|
||||||
|
if broadcastChannel != nil {
|
||||||
|
err := broadcastChannel.Broadcast(content)
|
||||||
|
if err != nil {
|
||||||
|
logging.LogErrorf("broadcast message failed: %s", err)
|
||||||
|
result.Code = -1
|
||||||
|
result.Msg = err.Error()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast binary message
|
||||||
|
for name, files := range form.File {
|
||||||
|
channel := Channel{
|
||||||
|
Name: name,
|
||||||
|
Count: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get broadcast channel
|
||||||
|
_broadcastChannel, exist := BroadcastChannels.Load(name)
|
||||||
|
var broadcastChannel *melody.Melody
|
||||||
|
if exist {
|
||||||
|
broadcastChannel = _broadcastChannel.(*melody.Melody)
|
||||||
|
channel.Count = broadcastChannel.Len()
|
||||||
|
} else {
|
||||||
|
broadcastChannel = nil
|
||||||
|
channel.Count = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast each binary message to the same channel
|
||||||
|
for _, file := range files {
|
||||||
|
result := &PublishResult{
|
||||||
|
Code: 0,
|
||||||
|
Msg: "",
|
||||||
|
Channel: channel,
|
||||||
|
Message: PublishMessage{
|
||||||
|
Type: BinaryMessageType,
|
||||||
|
Size: int(file.Size),
|
||||||
|
Filename: file.Filename,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
results = append(results, result)
|
||||||
|
|
||||||
|
if broadcastChannel != nil {
|
||||||
|
value, err := file.Open()
|
||||||
|
if err != nil {
|
||||||
|
logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err)
|
||||||
|
result.Code = -2
|
||||||
|
result.Msg = err.Error()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
content := make([]byte, file.Size)
|
||||||
|
if _, err := value.Read(content); err != nil {
|
||||||
|
logging.LogErrorf("read multipart form file [%s] failed: %s", file.Filename, err)
|
||||||
|
result.Code = -3
|
||||||
|
result.Msg = err.Error()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := broadcastChannel.BroadcastBinary(content); err != nil {
|
||||||
|
logging.LogErrorf("broadcast binary message failed: %s", err)
|
||||||
|
result.Code = -1
|
||||||
|
result.Msg = err.Error()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret.Data = map[string]interface{}{
|
||||||
|
"results": results,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// postMessage send string message to a broadcast channel
|
// postMessage send string message to a broadcast channel
|
||||||
|
//
|
||||||
// @param
|
// @param
|
||||||
//
|
//
|
||||||
// {
|
// {
|
||||||
|
|
|
||||||
|
|
@ -457,6 +457,7 @@ func ServeAPI(ginServer *gin.Engine) {
|
||||||
ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy)
|
ginServer.Handle("POST", "/api/network/forwardProxy", model.CheckAuth, model.CheckAdminRole, forwardProxy)
|
||||||
|
|
||||||
ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast)
|
ginServer.Handle("GET", "/ws/broadcast", model.CheckAuth, model.CheckAdminRole, broadcast)
|
||||||
|
ginServer.Handle("POST", "/api/broadcast/publish", model.CheckAuth, model.CheckAdminRole, broadcastPublish)
|
||||||
ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage)
|
ginServer.Handle("POST", "/api/broadcast/postMessage", model.CheckAuth, model.CheckAdminRole, postMessage)
|
||||||
ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels)
|
ginServer.Handle("POST", "/api/broadcast/getChannels", model.CheckAuth, model.CheckAdminRole, getChannels)
|
||||||
ginServer.Handle("POST", "/api/broadcast/getChannelInfo", model.CheckAuth, model.CheckAdminRole, getChannelInfo)
|
ginServer.Handle("POST", "/api/broadcast/getChannelInfo", model.CheckAuth, model.CheckAdminRole, getChannelInfo)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue