使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式
Posted
技术标签:
【中文标题】使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式【英文标题】:Implementing a Master/Worker pattern with PUB/SUB using ZeroMQ 【发布时间】:2021-11-27 12:41:01 【问题描述】:我使用 ZeroMQ 编写了一个玩具 Master/Worker”或“task farm”。
这是我到目前为止所得到的 - 但我想添加 PUB/SUB
,以便工作人员收听并响应主题(特定主题或通配符匹配)。
主人
#!/usr/bin/env python
from __future__ import print_function
import random
import time
from multiprocessing import Pool, Process
import zmq
from zmq.devices.basedevice import ProcessDevice
REQ_ADDRESS = 'tcp://127.0.0.1:6240'
REP_ADDRESS = 'tcp://127.0.0.1:6241'
if __name__ == '__main__':
# Start queue
context = zmq.Context()
sock_in = context.socket(zmq.ROUTER)
sock_in.bind(REQ_ADDRESS)
sock_out = context.socket(zmq.DEALER)
sock_out.bind(REP_ADDRESS)
zmq.device(zmq.QUEUE, sock_in, sock_out)
工人
#!/usr/bin/env python
from __future__ import print_function
import random
import time
import zmq
REP_ADDRESS = 'tcp://127.0.0.1:6241'
def receive_tasks():
"""
Client action: request tasks
"""
# ID: just to show that we're getting the right replies
my_id = random.randint(1, 1000000)
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect(REP_ADDRESS)
while True:
# Data is received here. Note that this blocks until
# we get a job.
job = socket.recv_json()
# Do work here
time.sleep(0.5)
# Send the result back. Pass any JSON-serializable object.
socket.send_json([my_id, job['id'], job['task_id']])
if __name__ == '__main__':
receive_tasks()
客户
#!/usr/bin/env python
from __future__ import print_function
import random
import zmq
from zmq.core.poll import select
REQ_ADDRESS = 'tcp://127.0.0.1:6240'
def request_tasks():
"""
Client action: request tasks
"""
# ID: just to show that we're getting the right replies
my_id = random.randint(1, 1000000)
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(REQ_ADDRESS)
for i in xrange(100):
job = 'id': my_id, 'task_id': random.randint(1, 100)
socket.send_json(job)
# Selects the sockets that have READ, WRITE, and ERROR
# events respectively within the lists, with timeout 5.
# Same API as: http://docs.python.org/library/select.html
(rlist, wlist, xlist) = select([socket], [], [], 5)
if len(rlist) > 0:
# This receives the reply and deserializes it from JSON.
msg = socket.recv_json()
print('Client 0, task #1: received work from 2 (for: 3)'.format(
my_id, i+1, msg[0], msg[1]))
else:
print('Client 0, task#1: error, timeout reached.'.format(my_id,
i+1))
socket.close()
socket = context.socket(zmq.REQ)
socket.connect(REQ_ADDRESS)
if __name__ == '__main__':
request_tasks()
我的问题是:如何修改 master 和 workers 以“了解 TOPIC” - 使用 PUB/SUB
?
注意:虽然我的示例代码是用 Python 编写的,并且图像插图指的是 Java - 我实际上是用 C++ 编写我的真实代码,所以请(如果可能)不要在你的回答中使用任何语言细节。
【问题讨论】:
您对此模式有任何经过身份验证的参考吗?如果是这样,请将其添加到标签的指导中。 @GertArnold 它被称为 Master/Worker(以前的 Master/Slave),我创建了标签master-worker
,因为它以前不存在
我知道你创造了它,这就是我问的原因。创建标签是有责任的。
【参考方案1】:
问: ...如何修改 master 和 worker 以“感知 TOPIC” - 使用 PUB/SUB?
欢迎,PUB/SUB
-Scalable Formal Communication Pattern Archetype 有一个静默陷阱(如果未从 ZeroMQ API 读取完整详细信息,请注意)-SUB
-side 必须主动订阅某些内容,否则在执行之前它什么也不会收到所以(就像真正的报纸一样,一个人永远不会在门口找到任何报纸,除非订阅既提高又付费 - 这里的成本在PUB-SUB
Context()
-engines之间分摊,在早期版本中进行主题过滤(所有)传递的消息 ALAP,即在传递到(所有)SUB
-side 之后(是的,网络 I/O 和多次(分布式)CPU 负载是“卸载”PUB
-的一种方式以极快的节奏和数量这样做的附带成本。更高版本,在 v3.4+ IIRC 之后,'已将主题管理 + 主题过滤移至 PUB
端(防止网络 I/O,但要求PUB
端的 RAM + CPU 资源需要为大规模部署进行充分调优。对性能和延迟范围非常满意,因此无需为此过早恐慌))
ZeroMQ PUB
-archetype TOPIC-filtering 一直基于实际的消息负载字节流,所以给定一些 SUB
-s 已经将他们的活动订阅设置为 "ABC"
,PUB
端将生成任何消息有效负载,从 "ABC
....." 开始,确实如此放入各自的交付队列中。在 ZeroMQ 文档中对 TOPIC-filter 订阅管理进行了很好的定义,只是想注意默认状态,即根本不存在订阅并且省略提及 ""
-string 订阅(接收所有内容) ,这将在 Master/Worker 群体案例中产生相当荒谬的结果,其中任何工作包都将由每个 Worker 处理(除了在某些最终但非常昂贵且低效的故障排除鲁棒性增加方法中)没有意义,这样做没有其他(性能、延迟或其他)好处。
也就是说,设计一个包含任何数量的额外信令和通信“套接字”原型的元平面网络没有其他限制,完全可以完成这项工作:
大师 可以
PUB. bind | setsockopt | send | close ()
以适当的顺序和时尚,以一种廉价的方式 (... 延迟 + RAM + CPU 上面的相关评论仍然有效...) 分发工作任务只发给那些积极订阅的人(“群体”管理可以处理新来者、丢失的人、N 复制工作任务,所有这些都只需使用主题过滤技巧)
可以相应地使用PULL. bind | setsockopt | poll | receive | close ()
,以便有效地,最好在一些“软/温和”-实时驱动的控制循环中,为上述分布式工作包收集结果'结果,根据需要对(未)授权和/或(非)篡改的控制检查进行验证
还可以
关于(未)经过身份验证的工作人员的“软”信号,用于存在/健康状态/工作状态,如果需要,可以通过重新使用主要PUB/SUB
频道并通过主要@接收答案987654341@一。然而,在 Master/Workers 之间设置一个有意分离的、卸载的、辅助信号 PUB/SUB
通道是没有问题的,以便保持这种“软”信号流独立于主要工作负载(确实发展了一个更专业的 SIG/COMMs 元-平面架构自定义distributed-computing)
软信号通道是创建某种特定领域语言(具有“命令”语法)的一种典型方式,用于在此类定义的distributed-system 的整个生命周期中控制“群体”。
很酷,不是吗?
客户(做一组选择性工作类型)
可以SUB. connect | setsockopt | receive | close ()
以适当的顺序和方式,以便自适应地设置、配置和接收来自PUB
的订阅工作包,同时保持状态信令的任何其他复杂性& 与其他主要不受限制的对等点进行数据交互)
可以相应地使用PUSH. connect | setsockopt | send | close ()
,以匹配Master的方式如何交付(+ auth, ev. +protect against tampering )以上收到的工作包的任何和所有结果' 结果,将它们自我验证为“授权”实体来交付它们和/或根据需要提供任何篡改控制检查
还可以
接收和响应任何“软”信号请求或异步通知与存在/健康状态/状态相关的任何显式状态更改(隐式状态更改检测自然是主设备的任务,在没有收到任何响应等之后) - 工作等,如果需要,可以通过重新使用主要的PUB/SUB
通道并通过主要的PUSH/PULL
上游通道传递相应的响应。然而,在 Worker 本身和 Master 之间设置一个有意分离的、Master-offloading、辅助信令PUB
和/或其他通道是没有问题的,以保持任何类型的“软”信令流独立于主要工作负载(自定义的distributed-computing确实可以创建任何类型的“平行宇宙”,其中Master是(或不是)其中的一部分;o))
【讨论】:
以上是关于使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式的主要内容,如果未能解决你的问题,请参考以下文章