python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?

Posted

技术标签:

【中文标题】python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?【英文标题】:How to use incoming data stream at a socket for multiple parallel processes in Python? 【发布时间】:2021-10-13 02:49:14 【问题描述】:
import socket 

ip_addr = '100.100.1.1'

port_num = 5000

socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
socket_obj.bind((ip_addr, port_num))
socket_obj.settimeout(2)

我已经创建了这个套接字对象,其中我有一个以特定速率传入的数据流。我想将这些数据实时用于并行运行的两个进程。

def process1():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_process(new_data)
        
        except socket.timeout:
            break

def process2():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_other_process(new_data)
        
        except socket.timeout:
            break

运行两个进程中的任何一个都可以完美运行,但是我如何确保我可以让两个进程并行运行,它们从同一个套接字读取,而两个流中的任何一个都没有任何明显的延迟或数据丢失?

传入数据的性质是非常确定的。正好 50 字节的数据以每秒 1000 次的速率传入。我设置了 2 秒的超时时间,这样一旦套接字在 2 秒内没有收到任何数据,进程就会结束。

此外,每个进程都需要访问到达套接字的每个数据包。

【问题讨论】:

如果 some_other_processsome_process 不同,这对于哪个函数将处理哪些数据似乎有点不确定。 @Booboo 传入数据的性质是非常确定的。正好 50 字节的数据以每秒 1000 次的速率传入。我设置了 2 秒的超时,以便一旦套接字在 2 秒内没有收到任何数据,进程就会结束。此外,每个进程都需要访问到达套接字的每个数据包。 【参考方案1】:

我会通过创建两个 Process 实例来解决这个问题,其中每个实例都传递自己的 multiprocessing.Queue 实例,主进程从套接字读取,将读取消息放在每个进程的队列中加工。写入和读取这些队列会有一些开销,这可能会稍微减慢最大处理速率,所以现在处理是否能跟上传入数据的问题就成了问题。但是您显然不能让每个进程并行地从套接字读取。请参阅我在桌面上进行的以下模拟,它描述了该问题。

import socket
import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    ip_addr = '100.100.1.1'
    port_num = 5000

    socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
    socket_obj.bind((ip_addr, port_num))
    socket_obj.settimeout(2)

    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    while True:
        try:
            new_data = socket_obj.recvfrom()
        except socket.timeout:
            break
        else:
            q1.put(new_data)
            q2.put(new_data)
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    p1.join()
    p2.join()

# Required if running under Windows:
if __name__ == '__main__':
    main()

我的桌面上的仿真

我在不那么快的桌面上运行了以下仿真,以查看由于将这些 50 字节数据项写入和读取到多处理队列的开销,我可以使用微不足道的处理功能维持多少速率:

import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    import time
    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    t1 = time.time()
    for new_data in range(10_000):
        # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
        expiration = time.time() + .001
        q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        diff = expiration - time.time()
        if diff > 0:
            time.sleep(diff)
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    rate = 10_000 / (time.time() - t1)
    print('Done:', rate)
    p1.join()
    p2.join()

# Required if running under Windows:
if __name__ == '__main__':
    main()

打印:

Done: 614.8320395921962

我只能维持每秒 615 条消息的速度。如果您写入队列的速度快于消息的处理速度,则会耗尽内存。这可不是什么好事。

更新

上面的模拟对我来说似乎有些可疑。我在以下基准测试中确定我可以以极高的速率(208,317 条消息/秒)写入队列,并且可以以很高的速率(23,094 条消息/秒)。我必须得出结论,由于 time.sleep 函数相当不精确,我之前的模拟是不准确的。

import multiprocessing

def some_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def some_other_process(q):
    while True:
        data = q.get()
        if data is None:
            break
        # process data:
        ...

def main():
    import time

    q1 = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=some_process, args=(q1,))
    q2 = multiprocessing.Queue()
    p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
    p1.start()
    p2.start()
    t1 = time.time()
    for _ in range(10_000):
        # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
        q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
    # wait for outstanding tasks to complete:
    q1.put(None)
    q2.put(None)
    rate = 10_000 / (time.time() - t1)
    print('Done. Put Rate:', rate)
    p1.join()
    p2.join()
    rate = 10_000 / (time.time() - t1)
    print('Done. Processing Rate:', rate)


# Required if running under Windows:
if __name__ == '__main__':
    main()

打印:

Done. Put Rate: 208317.3903110131
Done. Processing Rate: 23094.772557205524

【讨论】:

以上是关于python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?的主要内容,如果未能解决你的问题,请参考以下文章

Python套接字接收 - 传入的数据包总是有不同的大小

当目前没有 wsarecv 时,传入数据会发生啥

仅启用来自指定 IP 地址的传入连接

如何使用 Python 从 Web 套接字接收数据

poll() 在以 0 超时调用时接收传入数据

Python - 使用套接字设置源端口号