如何在进程运行时进行通信

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在进程运行时进行通信相关的知识,希望对你有一定的参考价值。

我想有两个并行运行的进程。一个从另一个获得输入,处理数据并将处理后的数据作为另一个的输出发回。另一个过程做同样的事情。显然需要有一个起点和终点。

如何在进程运行时进行通信?我只是设法在彼此之后运行这两个进程。

我试着用multiprocessing来解决它:

from multiprocessing import Process, Queue, Array
sentinel = -1

def process1(q, arr):
    # Receives data, modifies it and sends it back
    while True:
        data = q.get() # Receive data

        if data is sentinel:
            break
    data *= 2 # Data modification
    arr.append(data) # Data logging
    q.put(data) # Send data
    q.put(sentinel) # Send end signal

def process2(q, arr):
    # Receives data, increments it and sends data back
    if q.empty():
        data = 1
    else:
        while True:
            data = q.get() # Receive data
            if data == sentinel:
                break

    data += 1
    q.put(data) # Send data
    arr.append(data) # Data logging
    q.put(sentinel) # Send end signal

if __name__ == "__main__":
    q = Queue()
    logbook = Array('i', 0)
    counter = 0
    while counter < 10:
        process_1 = Process(target=process1, args=(q, logbook))
        process_2 = Process(target=process2, args=(q, logbook))
        process_1.start()
        process_2.start()
        q.close()
        q.join_thread()
        process_1.join()
        process_2.join()
        counter += 1
    print(logbook)
答案

我试图理解你的需求,但我并不完全清楚,因此我提出了代码的生产者 - 消费者版本,其中两个过程进行通信以达到一定量的迭代的最终结果。

首先,您需要两个队列,以避免将内容放入队列的相同进程在另一个队列之前读取它。其次,您需要一种机制来同意计算的结束,在这种情况下是无消息。

我提出的解决方案总结在以下代码中:

from multiprocessing import Process, Queue, Array

def process1(in_queue, out_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            out_queue.put(None)  # send feedback about END message
            out_queue.close()
            out_queue.join_thread()
            break

        data *= 2 # Data modification
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()

    process_1 = Process(target=process1, args=(q1, q2))
    process_2 = Process(target=process2, args=(q2, q1, 10))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()
另一答案

您可以使用套接字来完成此任务,甚至可以使用微服务方法(例如,通过rest api调用)。

另一答案

@Roberto Trani

从您的解决方案开始,我甚至可以使用第三个队列记录两个进程之间的通信。

谢谢,我真的被卡住了,不知道如何解决这个问题。

from multiprocessing import Process, Queue

def process1(in_queue, out_queue, log_queue):
    # Receives data, modifies it and sends it back
    while True:
        data = in_queue.get() # Receive data
        if data is None:
            log_queue.put(None) # log END
            out_queue.put(None)  # send feedback about END message
            break

        data *= 2 # Data modification
        print("p1: {}".format(data))
        log_queue.put(data) # Log data
        out_queue.put(data) # Send data

def process2(in_queue, out_queue, how_many, log_queue):
    data = 0

    # Receives data, increments it and sends data back
    while how_many > 0:
        data += 1 # Data modification
        print("p2: {}".format(data))
        log_queue.put(data) # Log Data
        out_queue.put(data) # Send data
        how_many -= 1

        data = in_queue.get() # Receive data
        if data is None:
            break

    # send END message
    log_queue.put(None) # log END
    out_queue.put(None)
    out_queue.close()
    out_queue.join_thread()
    assert in_queue.get() is None


if __name__ == "__main__":
    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    logbook = []

    process_1 = Process(target=process1, args=(q1, q2, q3))
    process_2 = Process(target=process2, args=(q2, q1, 10, q3))
    process_1.start()
    process_2.start()

    process_2.join()
    process_1.join()
    q1.close()
    q2.close()
    q1.join_thread()
    q2.join_thread()

    while True:
        data = q3.get()
        logbook.append(data)
        if data is None:
            break

    q3.close()
    q3.join_thread()

    print(logbook)
另一答案

你能用微服务方法进一步解释你的意思吗?我听说过REST,现在我想弄明白,我如何在Python中实现这个范例。 -

例如,像网络服务。您可以访问模块内的服务(函数,方法)。可以通过REST API访问此模块,例如使用自上而下的方法作为OpenApi规范(https://en.wikipedia.org/wiki/OpenAPI_Specification)。

我目前正在使用这种方法:设计高级接口(模块,每个模块的功能,层次结构和模块的互连);在openapi编辑器的yaml / json文件中使用CRUD(https://en.wikipedia.org/wiki/Create,_read,_update_and_delete)记下该设计以满足REST端点(在线:https://editor.swagger.io);使用编辑器功能生成python代码(flask);编辑锅炉板代码以实际实现后端功能;运行服务器以提供对API方法的访问。您甚至可以将模块转换为docker镜像以实现可伸缩性:我使用此基本图像:https://github.com/tiangolo/uwsgi-nginx-flask-docker/

以上是关于如何在进程运行时进行通信的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

在tablayout片段之间进行通信[重复]

java 简单的代码片段,展示如何将javaagent附加到运行JVM进程

如何在嵌套片段内的两个子片段之间进行通信

当有多个实例时如何与.net进程通信

如何在CEF JS与browser进程间异步通信