关于 ZeroMQ 并使用替代存储来存储消息

Posted

技术标签:

【中文标题】关于 ZeroMQ 并使用替代存储来存储消息【英文标题】:about ZeroMQ and using an alternative storage to store messages 【发布时间】:2018-04-12 11:48:17 【问题描述】:

我最近了解到,连接到另一个套接字(我正在使用 PUSH/PULL)的 ZeroMQ 套接字会继续推送数据,即使拉取数据的套接字脱机或断开连接也是如此。

我的理解是,一切都在内存中积累,一旦socket再次上线,它就会从停止的地方继续拉。

有没有办法设置 ZeroMQ 使用文件存储而不是内存来实现这种行为?如果是这样,是事件驱动的,告诉socket何时意识到Puller的重新连接断开?

我喜欢这个概念,但我对内存的过度使用感到害怕,我也不想使用可能会阻塞套接字的水印。

我在 C/C++ for Linux 下使用 ZeroMQ。

【问题讨论】:

【参考方案1】:

我相信没有这样的内置功能。如果要将消息存储到文件中,则必须自己实现。

根据经验(我非常广泛地使用 ZMQ)我会说您的解决方案更复杂且没有必要。 如果您真的觉得 RAM 会成为问题,更好的解决方案是创建一个 SWAP 文件并使用cgroups (example) 限制您的进程对 RAM 的使用。 结果将是您的进程将使用您允许的任何最大 RAM,然后使用 SWAP RAM(基本上是一个文件)来存储排队的消息。

【讨论】:

谢谢“叮当”。是的,这可能是一种方式。我看到这种为每个应用程序提供交换空间的方法是,整个应用程序也将使用交换(堆的每次使用都将转到交换),从而导致整体速度变慢。我更喜欢专门用于 MESSAGE 的专用存储。无论如何,我以前不知道“cgroups”,我肯定会尝试看看它是什么样的。 虽然交换文件确实较慢,但您确实应该对其进行测试以查看它是否具有可测量的影响。较慢并不意味着它还不够快:) 绝对正确!【参考方案2】:

...即使拉取数据的套接字离线或断开连接,也会继续推送数据。

不,不一定

此行为是特定配置的结果,而不是统一有效的行为。

阅读所有.setsockopt( zmq.IMMEDIATE, ... ).setsockopt( zmq.CONFLATE, ... ).setsockopt( zmq.SNDHWM, ... ).setsockopt( zmq.SNDBUF, ... )

...一切都在积累...一旦socket再次上线,它就会继续拉动

不,不一定

此行为是特定配置的结果,而不是统一有效的行为。

阅读所有.setsockopt( zmq.SNDHWM, ... ).setsockopt( zmq.SNDBUF, ... ).setsockopt( zmq.SNDTIMEO, ... ).setsockopt( zmq.BACKLOG, ... ).setsockopt( zmq.RECONNECT_IVL, ... ).setsockopt( zmq.RECONNECT_IVL_MAX, ... ) 和 依赖于操作系统的调优选项

...设置 ZeroMQ 以使用文件存储而不是内存来实现此行为?

不,不可用

高水印是激活“熔断器”断路器的最后手段(我不想说对功能的保护,它不能保护某些东西)。

显式扩展多方信号/握手是一个更好的策略,向 Context() 实例提供大量有效负载数据,这些数据旨在为未准备好的对等方从应用端更好地控制,而不是通过在底层某个地方分配越来越多的资源,是吗?

【讨论】:

...我前面有一条漫长而痛苦的路...:|感谢您的答复。这个周末要读很多书,然后再回复你。

以上是关于关于 ZeroMQ 并使用替代存储来存储消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码学习--消息存储篇

(转)RocketMQ源码学习--消息存储篇

关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

初码-Azure系列-存储队列的使用与一个Azure小工具(蓝天助手)

使用ZeroMq源测试Kuiper吞吐量

想了解Kafka,RabbitMQ,ZeroMQ,RocketMQ,ActiveMQ之间的差异?这一篇文章就够了!