zmq pub-sub通信之ipc双向主题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zmq pub-sub通信之ipc双向主题相关的知识,希望对你有一定的参考价值。


zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。

以下是测试程序,pub.py为服务端,sub.py客户端。

pub.py

# coding: utf-8

import zmq
import time
import threading
import os
import stat

# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"

# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"


TOPIC_LIST = ["lator", "att"]

def unlink_ipc(path):
index = path.rfind(ipc://)
if index < 0:
return

fpath = path[len(ipc://):]
#if os.path.exists(fpath):
os.unlink(fpath)

def pub(pubaddr, topic):
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.set_hwm(100)
#unlink_ipc(pubaddr)
sock.bind(pubaddr)
counter = 1
os.chmod(pubaddr[len(ipc://):], stat.S_IRWXO + stat.S_IRWXG + stat.S_IRWXU)
zpath = sock.getsockopt(zmq.LAST_ENDPOINT)
print zpath

while True:
messagedata = "this is msg fro topic one %s" % counter
print "%s %s" % (topic, messagedata)
sock.send("%s %s" % (topic, messagedata))
counter = counter + 1
time.sleep(1)


if __name__ == "__main__":
t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator"))
t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att"))
t1.start()
t2.start()
t1.join()
t2.join()

sub.py

# coding: utf-8

import os
import zmq
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream


# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"

# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"

TOPIC_LIST = ["lator", "att"]

def unlink_ipc(path):
index = path.rfind(ipc://)
if index < 0:
return

fpath = path[len(ipc://):]
if os.path.exists(fpath):
os.unlink(fpath)

def recv_func(msg):
print msg

def main2():
loop_instance = IOLoop.instance()
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.set_hwm(100)
sock.connect(LOG_TYPE_PUB_PATH)
sock.connect(LOG_SUB_PATH)
for key in TOPIC_LIST:
if isinstance(key, str):
sock.setsockopt(zmq.SUBSCRIBE, key)
elif isinstance(key, unicode):
sock.setsockopt_string(zmq.SUBSCRIBE, key)
else:
print("log_broker to set subscribe error:%s" % key)
sock = ZMQStream(sock, loop_instance)
sock.on_recv(recv_func)

loop_instance.start()

if __name__ == "__main__":
main2()

以上是关于zmq pub-sub通信之ipc双向主题的主要内容,如果未能解决你的问题,请参考以下文章

Android IPC进程间通信之AIDL双向通信

zmq的pub/sub模式下inproc,ipc,tcp,epgm的通信性能测试结果以及分析(二)

ZMQ是啥?咋用?有教程吗?有demo吗?|||有|||

Android IPC通信系列篇

ANSI C 中的双向 IPC

使用 request-reply 和 pub-sub 进行微服务通信