ZeroMQ-Push/Pull

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZeroMQ-Push/Pull相关的知识,希望对你有一定的参考价值。

Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes

# producer.py
# Producers are created with ZMQ.PUSH socket types. Producer is bound to well known port to which consumers can connect too.
import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)    

    zmq_socket.bind("tcp://127.0.0.1:5557")
    # Start your result manager and workers before you start your producers
    for num in xrange(20000):
        work_message = { ‘num‘ : num }
        zmq_socket.send_json(work_message)

producer()
# consumer.py
# Consumer are created with ZMQ.PULL socket types to pull requests from producer and uses a push socket to connect and push result to result collector.

import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    # send work
    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect("tcp://127.0.0.1:5558")
    
    while True:
        work = consumer_receiver.recv_json()
        data = work[‘num‘]
        result = { ‘consumer‘ : consumer_id, ‘num‘ : data}
        if data%2 == 0: 
            consumer_sender.send_json(result)

consumer()
# resultcollector.py
# result collector are created with ZMQ.PULL socket type and act as consumer of results from intermediate consumers. They also are bound to well known port so that intermedia# te consumer can connect to it.

import time
import zmq
import pprint

def result_collector():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)    results_receiver.bind("tcp://127.0.0.1:5558")    collecter_data = {}
    for x in xrange(1000):
        result = results_receiver.recv_json()
        if collecter_data.has_key(result[‘consumer‘]):
            collecter_data[result[‘consumer‘]] = collecter_data[result[‘consumer‘]] + 1
        else:
            collecter_data[result[‘consumer‘]] = 1
        if x == 999:
            pprint.pprint(collecter_data)

result_collector()
# running it:

python resultcollector.py
python consumer.py
python consumer.py
python producer.py

# Results shows the distribution of transmitted result to result collector:
(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{5892: 1000}

(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{1223: 1000}

(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{5892: 1000}

# consumer-1
(D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py
I am consumer #5892

# consumer-2
(D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py
I am consumer #1223

# producer
(D:\anaconda) C:\Users\admin\Desktop\opt>python producer.py


以上是关于ZeroMQ-Push/Pull的主要内容,如果未能解决你的问题,请参考以下文章

VSCode自定义代码片段——CSS选择器

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

片段和活动之间的核心区别是啥?哪些代码可以写成片段?

VSCode自定义代码片段——.vue文件的模板

VSCode自定义代码片段6——CSS选择器

VSCode自定义代码片段——声明函数