如何使用pyzmq正确发布和订阅最新消息?

Posted

技术标签:

【中文标题】如何使用pyzmq正确发布和订阅最新消息?【英文标题】:How to publish and subscribe the latest message correctly using pyzmq? 【发布时间】:2019-05-17 19:17:03 【问题描述】:

我有一个进程A不断发布消息,进程B和C订阅主题,获取进程A中发布者发布的最新消息。

所以,我将zmq.CONFLATE 设置为发布者和订阅者。但是,我发现一个订阅者无法接收消息。

def publisher(sleep_time=1.0, port="5556"):

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.bind("tcp://*:%s" % port)
    print ("Running publisher on port: ", port)

    while True:
        localtime = time.asctime( time.localtime(time.time()))
        string = "Message published time: ".format(localtime)
        socket.send_string("".format(string))
        time.sleep(sleep_time)

def subscriber(name="sub", sleep_time=1, ports="5556"):

    print ("Subscriber Name: , Sleep Time: , Port: ".format(name, sleep_time, ports))

    context = zmq.Context()
    print ("Connecting to publisher with ports %s" % ports)
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
    socket.connect ("tcp://localhost:%s" % ports)

    while True:

        message = socket.recv()
        localtime = time.asctime( time.localtime(time.time()))
        print ("\nSubscriber []\n[RECV]:  at [TIME]: ".format(name, message, localtime))
        time.sleep(sleep_time)


if __name__ == "__main__":
    Process(target=publisher).start()
    Process(target=subscriber, args=("SUB1", 1.2, )).start()
    Process(target=subscriber, args=("SUB2", 1.1, )).start()

我尝试在发布者中取消设置socket.setsockopt(zmq.CONFLATE, 1),这似乎解决了问题。进程 B 和 C 中的两个订阅者都可以收到消息,并且这些消息似乎是最新的。

我试图找出为什么将发布者设置为 CONFLATE 会导致我遇到的问题。我找不到有关它的信息。有谁知道是什么导致了这种行为?

另外我想知道,在一个发布者对多个订阅者的情况下,正确的代码设置是什么,让订阅者总能得到最新的消息?

【问题讨论】:

【参考方案1】:

这很可能是时间问题,ZMQ_CONFLATE 套接字选项将 入站 和出站队列限制为 1 条消息。

PUB/SUB 的工作方式是订阅者在设置 ZMQ_SUBSCRIBE 选项时向发布者发送订阅消息。如果您同时启动两个订阅者,那么到达发布者队列的订阅消息之一可能会被丢弃。

尝试在开始每个订阅者之间添加睡眠。

来自 zeromq 文档

如果设置,套接字将在其入站/出站中仅保留一条消息 队列,此消息是收到的最后一条消息/最后一条消息 将被寄出。忽略 ZMQ_RCVHWM 和 ZMQ_SNDHWM 选项。才不是 支持多部分消息,特别是其中只有一部分是 保存在套接字内部队列中。

我并不是说这是您问题的解决方案,但如果是这种情况,我们可能需要对 libzmq 进行更改以使合并选项更加精细,以便您可以选择是否应将合并应用于入站或出站排队。

【讨论】:

感谢您阐明 PUB/SUB 的工作原理,非常有用。但是,刚刚尝试在两个订阅者启动时在它们之间添加睡眠,问题仍然存在。 好吧,这很有趣,我会将 PUB 更改为 XPUB 并轮询传入的订阅消息。这样您就可以调试并查看两个订阅是否都到达了发布者。我也会放弃订阅者的睡眠并使用阻塞 recv(),以防万一我们在睡眠期间丢失了线索/消息。 谢谢。我试图用 XPUB 替换 PUB,并为 XPUB 设置 CONFLATE,其他保持不变(订阅者开始之间没有睡眠)。现在,两个订阅者都能够接收消息。我现在很困惑。 XPUB 和 PUB 并没有那么不同,但在这种情况下它们的行为不同。你对此有什么想法吗? 是的,XPUB 应该只添加在传入订阅消息中查看的功能,但也可能会更改时间。在这一点上,而不是进一步推测(如果你有时间),我将使用 C 和当前在 v4.3.1 上的底层核心库 libzmq 重新编写测试,因为 python 库可能正在使用旧版本或错误是特定于 python 的。 非常感谢。您可以使用有关 XPUB 的信息编辑答案吗?我会接受你的回答。【参考方案2】:

有一种方法可以在ZMQ订阅套接字中获取“仅最后一条消息”选项(使用CONFLATE选项)。

您需要它在订阅者方面。

这是一个例子:

import zmq

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while True:
    data = socket.recv()
    print data

另一方面,我删除了订阅者代码中的所有缓冲队列。


[附加]:

使用zmq.SNDBUFzmq.RCVBUF 选项,我们可以设置ZMQ 缓冲区大小的限制。 (More complete and an example)


【讨论】:

以上是关于如何使用pyzmq正确发布和订阅最新消息?的主要内容,如果未能解决你的问题,请参考以下文章

使用 CloudKit 共享时如何正确使用数据库订阅

消息中间件 - 如何避免使用通配符订阅重入?

如何在使用 triggerTopic 创建云功能时设置发布订阅消息过滤器

SSH如何去发布订阅监听

如何从pyzmq中的生成器发送味精?

如何使用 AWS CloudFormation 为 SNS 订阅指定“原始消息传递”?