Go:将 gob 与 zmq4 一起使用

Posted

技术标签:

【中文标题】Go:将 gob 与 zmq4 一起使用【英文标题】:Go: using gob with zmq4 【发布时间】:2020-04-25 15:40:01 【问题描述】:

我正在尝试使用带有 zmq4 套接字的 gob(来自 pebbe 的 zmq4)。 zmq4 套接字没有 io 设备,这使得 gob 似乎无法直接读/写:

我不能在gob.NewEncoder 的参数中使用&client ( type **zmq4.Socket ) 作为io.Writer 类型:zmq4.Socket 没有实现io.Writer(缺少Write 方法)

一个zmq4 发送函数SendMessage() 接受interface,所以我使用它来发送。

在服务器端zmq4 接收函数返回string[]byte[]string[][]byte。我正在使用RecvMessage(),它返回[]string

可以写入bytes.Buffer,发送该缓冲区,将其作为[]string 读取,然后使用gob 处理消息的内容部分。尽管当前的问题在于将[]string 转换为bytes.Buffer 以便gob 能够从中io.Read。听起来很基础,但到目前为止我已经尝试了很多方法,但都没有成功。这是当前的。问题显然在于 gob 产生“缓冲区中的额外数据”,当数据在发送之前和接收之后似乎相同时,正如print 语句所显示的那样。

有没有更简单、更类似的方式来做这件事?如果你有zmq4,下面的代码是独立的,应该执行。

package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
    "sync"
    "time"

    zmq "github.com/pebbe/zmq4"
)

type LogEntry struct 
    ErrID  int
    Name   string
    Level  string
    LogStr string


// The client task
// --------------------------------------------------------------
func client_task(s string) 
    var mu sync.Mutex
    client, _ := zmq.NewSocket(zmq.DEALER)
    defer client.Close()
    client.SetIdentity(s)
    client.Connect("tcp://localhost:5570")

    go func() 

        aLogEntry := &LogEntry
            ErrID:  1,
            Name:   "Client",
            LogStr: "Log msg sent",
        

        for request_nbr := 1; true; request_nbr++ 

            var network bytes.Buffer
            enc := gob.NewEncoder(&network)
            err := enc.Encode(aLogEntry)
            if err != nil 
                fmt.Println("encode error:", err)
            

            // Early decode test - this will influence subsequent gob
            // behaviour so leave commented when caring about the sent
            // data
            // dec := gob.NewDecoder(&network)
            // var aLogEntry2 *LogEntry
            // err = dec.Decode(&aLogEntry2)
            // if err != nil 
            //  fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
            // 
            // fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

            mu.Lock()
            // Replaced length by bytes Buffer method: 91
            fmt.Printf("client_task(len) : %d\n\n", network.Len())
            fmt.Printf("client_task(network) : %v\n\n", network)

            client.SendMessage(network, 0)
            mu.Unlock()
            time.Sleep(5 * time.Second)
        
    ()

    // pause to allow server
    for 
        time.Sleep(100 * time.Millisecond)
    


// The server task
// --------------------------------------------------------------
func server_task() 

    frontend, _ := zmq.NewSocket(zmq.ROUTER)
    defer frontend.Close()
    frontend.Bind("tcp://*:5570")

    for 
        msg, _ := frontend.RecvMessage(0)
        // Added error reporting - does not report any error
        // err does never get filled in here, never an error could get reported here
        if err != nil 
            fmt.Printf("RECV ERROR: %s", err)
        

        // using WriteString to write the content portion of the
        // received message to the bytes.Buffer for gob to process
        var network bytes.Buffer
        dec := gob.NewDecoder(&network)
        network.WriteString(msg[1])

        // Added length of the bytes Buffer: 285
        // Before sending the bytes Buffer is: 91
        // More than just msg[1] is written into the buffer ?
        fmt.Printf("server_task(len): %d\n\n", network.Len())
        fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

        var aLogEntry *LogEntry
        err := dec.Decode(&aLogEntry)
        if err != nil 
            fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
        

        fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
    


func main() 
    defer fmt.Println("main() done")

    go client_task("1")
    go server_task()

    //  Run for 5 seconds then quit
    time.Sleep(5 * time.Second)

打印语句在客户端显示:

client_task(network) : [62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0

在服务器端:

server_task(msg[1]) : [62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0

这看起来几乎一样。

结果是:

server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>

【问题讨论】:

【参考方案1】:

观察

ZeroMQ 原生 API 定义了这个属性:

当接收到消息时,ZMQ_ROUTER 套接字应在消息部分之前在消息中在将消息传递给应用程序之前将包含原始对等方的路由 ID 的消息部分 .

解决方案

可以开始使用足够正确的 PUSH/PULL 可扩展的正式原型,而不是(对于您的用例而言过于复杂)DEALER/ROUTER,或可能依赖于假设您的 ROUTER-node 永远不会 .RecvMessage( 0 ) 与其他内部多部分结构,而是唯一的一个,与 [ &lt;routing_id&gt; | &lt;[network]-payload&gt; [ | ... ] ] 的模板匹配,这不能得到强有力的保证,可以吗?

虽然不确定,但 ZeroMQ 的 pebbe 的 zmq4 go-wrapper 如何尝试或不实现所有本机 API 功能和/或处理 中的潜在差异(无需任何用户-级别应用程序干预?) 读取所有多部分组件并处理一次性和/或以 NULL 结尾的字符串的处理,这可能在 .Decode()-method 内部发生冲突。

最后但并非最不重要的一点是,如果您的代码依赖 msg[1] 在内部具有有效且符合所有约定的有效负载,如果我没有忽略一些低级黑客,我不知道,我看不到任何明确的处理情况下,当发起方(DEALER)没有向消费者方(ROUTER)传递任何这样的新消息,但.RecvMessage( 0 )-方法填充msg并继续(空@987654337 @ ) 朝向.Decode()-方法,由于明显的原因,它必须在空的或格式错误的msg 上失败,不是吗?

我肯定会从 PUSH/PULL 替换开始,它不会在交付端注入前置的、现在的多帧组合,具有 routing_id 和相关风险。

如果在PULL-RxQueue-buffers 内没有等待消息等待,则从.RecvMessage()-方法返回的非阻塞模式仍会在用空数据填充msg 时发生冲突,这仍会使@987654345 恐慌@-方法。

如果.RecvMessage( 0 )-method 调用实际上表现出阻塞模式接收,如果要将 ZeroMQ 从错误的根本原因分析中完全排除,则应该更加注意错误状态检测和处理。对所有已部署的 ZeroMQ 资源进行更多的自我防御.setsockopt()-setups(ZMQ_LINGER 和许多其他资源)也将提高健壮性和易于出错的程度,即对于崩溃的应用程序可能会造成任何伤害的情况生产。

您可以尝试重现错误:here IDE 不幸错过了 ZeroMQ 部分。

Golang.org site 也失败了:

go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags  -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.

【讨论】:

我已经离线写了一个回复,很详细,现在我看到评论有500多个字符的限制。新手的快乐。当我发现如何使用 SO 时,请多多包涵。我将尝试将评论设为“多部分”。另一种选择是“回答问题”,这完全不是这里的意图。 嗯.. 这太局限了,无法完全回复。我会试试电报风格。基于 asyncsrv.go 的问题,相当于“Code Connected vol. 1”的第 107 页,遵循框架讨论。遗漏了消息的弹出部分,如果可能,喜欢使用 ROUTER/DEALER。专注于问题。在发送数据之前解码数据工作正常,在上面的问题中添加了测试代码。 fmt.Printf() 显示发送前和接收后的数据相同。困惑。 可能是 fmt.Printf(%v) 语句没有打印所有内容,并且在上述问题的两个输出中没有显示数据吗?是的,RecvMessage(..., 0) 在阻塞模式下运行,目前我发现这对处理 gob 问题很有用。 是的,RecvMessage(..., 0) 在阻塞模式下运行,目前我发现这对处理 gob 问题很有用。问题的关键:两个相同报告的数据 (fmt.Printf()) = 问题 gob。在 zmq 中接收数据的各种方式?在这里不需要观察 ZMQ_RCVMORE polly,一旦这个 gob 问题得到解决,就会完全实现。如果它有助于发光,我可以尝试推/拉,尽管使用 ROUTER/DEALER 是我在功能方面的考虑。再次感谢您的回复和洞察力 - 非常感谢。 如果 Playground 或 tio 可以处理(一些)外部包,那就太好了,尽管我猜他们不太热衷于这样做。我已向 RecvMessage() 添加了错误报告。

以上是关于Go:将 gob 与 zmq4 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

Go:专属二进制编码方式Gob

Option类型:C++(std::optional)Rust(Option)Go(gob.OptionalValue)

将 Go 语言与 MSYS2 一起使用

Go 每日一库之 jsonrpc:来自标准库

如何将 Sentry 与 go.uber.org/zap/zapcore 记录器一起使用

将 Go beta 版本与 GitHub Actions 一起使用