python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?
Posted
技术标签:
【中文标题】python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?【英文标题】:How to use incoming data stream at a socket for multiple parallel processes in Python? 【发布时间】:2021-10-13 02:49:14 【问题描述】:import socket
ip_addr = '100.100.1.1'
port_num = 5000
socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
socket_obj.bind((ip_addr, port_num))
socket_obj.settimeout(2)
我已经创建了这个套接字对象,其中我有一个以特定速率传入的数据流。我想将这些数据实时用于并行运行的两个进程。
def process1():
while True:
Try:
new_data = socket_obj.recvfrom()
some_process(new_data)
except socket.timeout:
break
def process2():
while True:
Try:
new_data = socket_obj.recvfrom()
some_other_process(new_data)
except socket.timeout:
break
运行两个进程中的任何一个都可以完美运行,但是我如何确保我可以让两个进程并行运行,它们从同一个套接字读取,而两个流中的任何一个都没有任何明显的延迟或数据丢失?
传入数据的性质是非常确定的。正好 50 字节的数据以每秒 1000 次的速率传入。我设置了 2 秒的超时时间,这样一旦套接字在 2 秒内没有收到任何数据,进程就会结束。
此外,每个进程都需要访问到达套接字的每个数据包。
【问题讨论】:
如果some_other_process
与 some_process
不同,这对于哪个函数将处理哪些数据似乎有点不确定。
@Booboo 传入数据的性质是非常确定的。正好 50 字节的数据以每秒 1000 次的速率传入。我设置了 2 秒的超时,以便一旦套接字在 2 秒内没有收到任何数据,进程就会结束。此外,每个进程都需要访问到达套接字的每个数据包。
【参考方案1】:
我会通过创建两个 Process
实例来解决这个问题,其中每个实例都传递自己的 multiprocessing.Queue
实例,主进程从套接字读取,将读取消息放在每个进程的队列中加工。写入和读取这些队列会有一些开销,这可能会稍微减慢最大处理速率,所以现在处理是否能跟上传入数据的问题就成了问题。但是您显然不能让每个进程并行地从套接字读取。请参阅我在桌面上进行的以下模拟,它描述了该问题。
import socket
import multiprocessing
def some_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def some_other_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def main():
ip_addr = '100.100.1.1'
port_num = 5000
socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
socket_obj.bind((ip_addr, port_num))
socket_obj.settimeout(2)
q1 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=some_process, args=(q1,))
q2 = multiprocessing.Queue()
p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
p1.start()
p2.start()
while True:
try:
new_data = socket_obj.recvfrom()
except socket.timeout:
break
else:
q1.put(new_data)
q2.put(new_data)
# wait for outstanding tasks to complete:
q1.put(None)
q2.put(None)
p1.join()
p2.join()
# Required if running under Windows:
if __name__ == '__main__':
main()
我的桌面上的仿真
我在不那么快的桌面上运行了以下仿真,以查看由于将这些 50 字节数据项写入和读取到多处理队列的开销,我可以使用微不足道的处理功能维持多少速率:
import multiprocessing
def some_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def some_other_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def main():
import time
q1 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=some_process, args=(q1,))
q2 = multiprocessing.Queue()
p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
p1.start()
p2.start()
t1 = time.time()
for new_data in range(10_000):
# Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
expiration = time.time() + .001
q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
diff = expiration - time.time()
if diff > 0:
time.sleep(diff)
# wait for outstanding tasks to complete:
q1.put(None)
q2.put(None)
rate = 10_000 / (time.time() - t1)
print('Done:', rate)
p1.join()
p2.join()
# Required if running under Windows:
if __name__ == '__main__':
main()
打印:
Done: 614.8320395921962
我只能维持每秒 615 条消息的速度。如果您写入队列的速度快于消息的处理速度,则会耗尽内存。这可不是什么好事。
更新
上面的模拟对我来说似乎有些可疑。我在以下基准测试中确定我可以以极高的速率(208,317 条消息/秒)写入队列,并且可以以很高的速率(23,094 条消息/秒)。我必须得出结论,由于 time.sleep
函数相当不精确,我之前的模拟是不准确的。
import multiprocessing
def some_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def some_other_process(q):
while True:
data = q.get()
if data is None:
break
# process data:
...
def main():
import time
q1 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=some_process, args=(q1,))
q2 = multiprocessing.Queue()
p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
p1.start()
p2.start()
t1 = time.time()
for _ in range(10_000):
# Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
# wait for outstanding tasks to complete:
q1.put(None)
q2.put(None)
rate = 10_000 / (time.time() - t1)
print('Done. Put Rate:', rate)
p1.join()
p2.join()
rate = 10_000 / (time.time() - t1)
print('Done. Processing Rate:', rate)
# Required if running under Windows:
if __name__ == '__main__':
main()
打印:
Done. Put Rate: 208317.3903110131
Done. Processing Rate: 23094.772557205524
【讨论】:
以上是关于python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?的主要内容,如果未能解决你的问题,请参考以下文章