使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式

Posted

技术标签:

【中文标题】使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式【英文标题】:Implementing a Master/Worker pattern with PUB/SUB using ZeroMQ 【发布时间】:2021-11-27 12:41:01 【问题描述】:

我使用 ZeroMQ 编写了一个玩具 Master/Worker”或“task farm”。

这是我到目前为止所得到的 - 但我想添加 PUB/SUB,以便工作人员收听并响应主题(特定主题或通配符匹配)。

主人

#!/usr/bin/env python
from __future__ import print_function
import random
import time
from multiprocessing import Pool, Process

import zmq
from zmq.devices.basedevice import ProcessDevice

REQ_ADDRESS = 'tcp://127.0.0.1:6240'
REP_ADDRESS = 'tcp://127.0.0.1:6241'

if __name__ == '__main__':
    # Start queue
    context = zmq.Context()
    sock_in = context.socket(zmq.ROUTER)
    sock_in.bind(REQ_ADDRESS)
    sock_out = context.socket(zmq.DEALER)
    sock_out.bind(REP_ADDRESS)
    zmq.device(zmq.QUEUE, sock_in, sock_out)

工人

#!/usr/bin/env python
from __future__ import print_function
import random
import time

import zmq

REP_ADDRESS = 'tcp://127.0.0.1:6241'

def receive_tasks():
    """
    Client action: request tasks
    """
    # ID: just to show that we're getting the right replies
    my_id = random.randint(1, 1000000)
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect(REP_ADDRESS)
    while True:
        # Data is received here. Note that this blocks until
        # we get a job.
        job = socket.recv_json()
        # Do work here
        time.sleep(0.5)
        # Send the result back. Pass any JSON-serializable object.
        socket.send_json([my_id, job['id'], job['task_id']])

if __name__ == '__main__':
    receive_tasks()

客户

#!/usr/bin/env python
from __future__ import print_function
import random

import zmq
from zmq.core.poll import select

REQ_ADDRESS = 'tcp://127.0.0.1:6240'

def request_tasks():
    """
    Client action: request tasks
    """
    # ID: just to show that we're getting the right replies
    my_id = random.randint(1, 1000000)
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect(REQ_ADDRESS)
    for i in xrange(100):
        job = 'id': my_id, 'task_id': random.randint(1, 100)
        socket.send_json(job)
        # Selects the sockets that have READ, WRITE, and ERROR
        # events respectively within the lists, with timeout 5.
        # Same API as: http://docs.python.org/library/select.html
        (rlist, wlist, xlist) = select([socket], [], [], 5)
        if len(rlist) > 0:
            # This receives the reply and deserializes it from JSON.
            msg = socket.recv_json()
            print('Client 0, task #1: received work from 2 (for: 3)'.format(
                my_id, i+1, msg[0], msg[1]))
        else:
            print('Client 0, task#1: error, timeout reached.'.format(my_id,
                i+1))
            socket.close()
            socket = context.socket(zmq.REQ)
            socket.connect(REQ_ADDRESS)

if __name__ == '__main__':
    request_tasks()

我的问题是:如何修改 master 和 workers 以“了解 TOPIC” - 使用 PUB/SUB

注意:虽然我的示例代码是用 Python 编写的,并且图像插图指的是 Java - 我实际上是用 C++ 编写我的真实代码,所以请(如果可能)不要在你的回答中使用任何语言细节。

【问题讨论】:

您对此模式有任何经过身份验证的参考吗?如果是这样,请将其添加到标签的指导中。 @GertArnold 它被称为 Master/Worker(以前的 Master/Slave),我创建了标签 master-worker,因为它以前不存在 我知道你创造了它,这就是我问的原因。创建标签是有责任的。 【参考方案1】:

问: ...如何修改 master 和 worker 以“感知 TOPIC” - 使用 PUB/SUB?

欢迎,PUB/SUB-Scalable Formal Communication Pattern Archetype 有一个静默陷阱(如果未从 ZeroMQ API 读取完整详细信息,请注意)-SUB-side 必须主动订阅某些内容,否则在执行之前它什么也不会收到所以(就像真正的报纸一样,一个人永远不会在门口找到任何报纸,除非订阅既提高又付费 - 这里的成本在PUB-SUBContext()-engines之间分摊,在早期版本中进行主题过滤(所有)传递的消息 ALAP,即在传递到(所有)SUB-side 之后(是的,网络 I/O 和多次(分布式)CPU 负载是“卸载”PUB-的一种方式以极快的节奏和数量这样做的附带成本。更高版本,在 v3.4+ IIRC 之后,'已将主题管理 + 主题过滤移至 PUB 端(防止网络 I/O,但要求PUB 端的 RAM + CPU 资源需要为大规模部署进行充分调优。对性能和延迟范围非常满意,因此无需为此过早恐慌))

ZeroMQ PUB-archetype TOPIC-filtering 一直基于实际的消息负载字节流,所以给定一些 SUB-s 已经将他们的活动订阅设置为 "ABC"PUB 端将生成任何消息有效负载,从 "ABC....." 开始,确实如此放入各自的交付队列中。在 ZeroMQ 文档中对 TOPIC-filter 订阅管理进行了很好的定义,只是想注意默认状态,即根本不存在订阅并且省略提及 ""-string 订阅(接收所有内容) ,这将在 Master/Worker 群体案例中产生相当荒谬的结果,其中任何工作包都将由每个 Worker 处理(除了在某些最终但非常昂贵且低效的故障排除鲁棒性增加方法中)没有意义,这样做没有其他(性能、延迟或其他)好处。

也就是说,设计一个包含任何数量的额外信令和通信“套接字”原型的元平面网络没有其他限制,完全可以完成这项工作:


大师 可以

PUB. bind | setsockopt | send | close () 以适当的顺序和时尚,以一种廉价的方式 (... 延迟 + RAM + CPU 上面的相关评论仍然有效...) 分发工作任务只发给那些积极订阅的人(“群体”管理可以处理新来者、丢失的人、N 复制工作任务,所有这些都只需使用主题过滤技巧)

可以相应地使用PULL. bind | setsockopt | poll | receive | close (),以便有效地,最好在一些“软/温和”-实时驱动的控制循环中,为上述分布式工作包收集结果'结果,根据需要对(未)授权和/或(非)篡改的控制检查进行验证

还可以 关于(未)经过身份验证的工作人员的“软”信号,用于存在/健康状态/工作状态,如果需要,可以通过重新使用主要PUB/SUB 频道并通过主要@接收答案987654341@一。然而,在 Master/Workers 之间设置一个有意分离的、卸载的、辅助信号 PUB/SUB 通道是没有问题的,以便保持这种“软”信号流独立于主要工作负载(确实发展了一个更专业的 SIG/COMMs 元-平面架构自定义distributed-computing)

软信号通道是创建某种特定领域语言(具有“命令”语法)的一种典型方式,用于在此类定义的distributed-system 的整个生命周期中控制“群体”。

很酷,不是吗?


客户(做一组选择性工作类型) 可以SUB. connect | setsockopt | receive | close ()以适当的顺序和方式,以便自适应地设置、配置和接收来自PUB的订阅工作包,同时保持状态信令的任何其他复杂性& 与其他主要不受限制的对等点进行数据交互)

可以相应地使用PUSH. connect | setsockopt | send | close (),以匹配Master的方式如何交付(+ auth, ev. +protect against tampering )以上收到的工作包的任何和所有结果' 结果,将它们自我验证为“授权”实体来交付它们和/或根据需要提供任何篡改控制检查

还可以 接收和响应任何“软”信号请求或异步通知与存在/健康状态/状态相关的任何显式状态更改(隐式状态更改检测自然是主设备的任务,在没有收到任何响应等之后) - 工作等,如果需要,可以通过重新使用主要的PUB/SUB 通道并通过主要的PUSH/PULL 上游通道传递相应的响应。然而,在 Worker 本身和 Master 之间设置一个有意分离的、Master-offloading、辅助信令PUB 和/或其他通道是没有问题的,以保持任何类型的“软”信令流独立于主要工作负载(自定义的distributed-computing确实可以创建任何类型的“平行宇宙”,其中Master是(或不是)其中的一部分;o)

【讨论】:

以上是关于使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式的主要内容,如果未能解决你的问题,请参考以下文章

zeromq pub sub 上丢失的消息

ZeroMQ PUB/SUB 绑定订阅者

ZeroMq PUB/SUB 模式无法正常工作

消息队列、EventBus 和 Pub/Sub 之间的区别?

ZeroMQ/NanoMsg 发布/订阅与多播

zeromq使用模式实验总结