zmq pub-sub通信之ipc双向主题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zmq pub-sub通信之ipc双向主题相关的知识,希望对你有一定的参考价值。
zmq pub-sub, push-pull模式没有客服端服务端启动先后顺序的限制,与普通的socket通信不一样,必须先启动服务端。
以下是测试程序,pub.py为服务端,sub.py客户端。
pub.py
# coding: utf-8
import zmq
import time
import threading
import os
import stat
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind(ipc://)
if index < 0:
return
fpath = path[len(ipc://):]
#if os.path.exists(fpath):
os.unlink(fpath)
def pub(pubaddr, topic):
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.set_hwm(100)
#unlink_ipc(pubaddr)
sock.bind(pubaddr)
counter = 1
os.chmod(pubaddr[len(ipc://):], stat.S_IRWXO + stat.S_IRWXG + stat.S_IRWXU)
zpath = sock.getsockopt(zmq.LAST_ENDPOINT)
print zpath
while True:
messagedata = "this is msg fro topic one %s" % counter
print "%s %s" % (topic, messagedata)
sock.send("%s %s" % (topic, messagedata))
counter = counter + 1
time.sleep(1)
if __name__ == "__main__":
t1 = threading.Thread(target=pub, args=(LOG_TYPE_PUB_PATH, "lator"))
t2 = threading.Thread(target=pub, args=(LOG_SUB_PATH, "att"))
t1.start()
t2.start()
t1.join()
t2.join()
sub.py
# coding: utf-8
import os
import zmq
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
# 分类后的日志的zmq的pub地址
LOG_TYPE_PUB_PATH = "ipc:///tmp/log_types.ipc"
# 日志的zmq的sub地址
LOG_SUB_PATH = "ipc:///tmp/log_lator.ipc"
TOPIC_LIST = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind(ipc://)
if index < 0:
return
fpath = path[len(ipc://):]
if os.path.exists(fpath):
os.unlink(fpath)
def recv_func(msg):
print msg
def main2():
loop_instance = IOLoop.instance()
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.set_hwm(100)
sock.connect(LOG_TYPE_PUB_PATH)
sock.connect(LOG_SUB_PATH)
for key in TOPIC_LIST:
if isinstance(key, str):
sock.setsockopt(zmq.SUBSCRIBE, key)
elif isinstance(key, unicode):
sock.setsockopt_string(zmq.SUBSCRIBE, key)
else:
print("log_broker to set subscribe error:%s" % key)
sock = ZMQStream(sock, loop_instance)
sock.on_recv(recv_func)
loop_instance.start()
if __name__ == "__main__":
main2()
以上是关于zmq pub-sub通信之ipc双向主题的主要内容,如果未能解决你的问题,请参考以下文章