ZeroMQ 和本地 FIFO

Posted

技术标签:

【中文标题】ZeroMQ 和本地 FIFO【英文标题】:ZeroMQ and local FIFO 【发布时间】:2013-03-01 20:10:24 【问题描述】:

我有两个进程(“发送者”和“接收者”)需要通过瞬态单向 FIFO 通信管道在单台机器上本地进行通信。这是我想要发生的事情(使用更接近 Unix 域套接字的语言):

发送者在已知地址“创建”管道,并立即将消息向下发送 在某个时间点(发送方“创建”管道之前或之后),接收方连接到管道 阅读器从管道中读取消息 发件人“关闭”管道 Reader 注意到所有消息都已被读取(可能管道已关闭)

我的问题是:如何使用 ZeroMQ 实现这一点? “PUB/SUB”,“推/拉”?在 ZMQ 套接字中检测“数据结束”的机制是什么?是否可以同时允许上述前两项的排序:即发送方或接收方是否首先尝试连接?如果有,怎么做?

谢谢。

【问题讨论】:

【参考方案1】:

关于 zeromq 的注意事项:

    绑定/连接顺序通常不重要 PUSH/PULL 用于当一个对等方应接收每条消息和/或不应丢弃消息时 PUB/SUB 用于所有对等方都应接收消息和/或应丢弃在无人收听时发送的消息。 ZeroMQ 故意在应用程序代码中隐藏连接/断开打开/关闭事件,因此您无法检测到实际的关闭事件。

你需要知道的一件事,你不应该知道:当一个套接字连接时,它会创建一个管道(对等点还不需要存在)。当一个套接字绑定时,它只在对等点连接时创建管道。这些管道控制套接字的 HWM 行为。这意味着没有对等点的连接套接字和没有对等点的绑定套接字的行为是不同的。如果您尝试使用它发送消息,则没有对等点的绑定套接字将阻塞,而连接套接字将愉快地在内存中排队消息,直到对等点到达并开始消费消息。

基于这几点,你想做的是:

    使用推/拉 接收者应该绑定 发送一条特殊的“关闭”消息,指示队列已完成,而不是检测 tcp/ipc 级别的关闭事件。

这是一个使用 IPC 套接字(文件)进行通信的 Python 工作示例,其中接收者在发送者之后的某个时间开始。

双方需要知道的共同信息:

import time

import zmq

# the file used for IPC communication
PIPE = '/tmp/fifo-pipe'

# command flags for our tiny message protocol
DONE = b'\x00'
MSG = b'\x01'

接收者 (PULL) 绑定,并消耗直到 DONE

def receiver():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PULL)
    s.bind("ipc://%s" % PIPE)
    while True:
        parts = s.recv_multipart()
        cmd = parts[0]
        if cmd == DONE:
            print "[R] received DONE"
            break
        msg = parts[1]
        # handle the message
        print "[R] %.1f consuming %s" % (time.time() - t0, msg)
    s.close()
    ctx.term()
    print "[R] done"

发送方(PUSH)连接并发送,发送 DONE 表示完成

def sender():
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUSH)
    s.connect("ipc://%s" % PIPE)

    for i in range(10):
        msg = b'msg %i' % i
        print "[S] %.1f sending %s" % (time.time() - t0, msg)
        s.send_multipart([MSG, msg])
        time.sleep(1)
    print "[S] sending DONE"
    s.send(DONE)
    s.close()
    ctx.term()
    print "[S] done"

还有一个演示脚本一起运行它们,发送者首先启动,接收者在发送者已经发送了几条消息后启动:

from threading import Thread

# global t0, just for keeping times relative to start, rather than 1970
t0 = time.time()

# start the sender
s = Thread(target=sender)
s.start()

# start the receiver after a delay
time.sleep(5)
r = Thread(target=receiver)
r.start()

# wait for them both to finish
s.join()
r.join()

可以看到一起运行here。

【讨论】:

以上是关于ZeroMQ 和本地 FIFO的主要内容,如果未能解决你的问题,请参考以下文章

图文并茂,带你入坑本地缓存 Caffeine

安装storm集群

本地缓存

单生产者/单消费者 的 FIFO 无锁队列

关于socket

Socket--模拟聊天