为Python脚本创建共享消息流的最佳方法是什么?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为Python脚本创建共享消息流的最佳方法是什么?相关的知识,希望对你有一定的参考价值。

我想做什么:我需要一个简单的消息流,因此一些脚本可以在那里发送结果,另一个脚本可以获取结果并以异步方式执行某些操作。

主要问题:我想看看发生了什么,所以如果出现问题 - 我可以快速解决。我尝试使用Celery + RabbitMQ(可以看到带有args的工人,使用Flower,但是调度过于复杂)和multiprocessing.Queue(简单,但是看不到带有args的工人)。


我做了什么:我尝试使用MongoDB上限集合构建类似的东西,并使用多个进程运行Popen来做出反应。有些脚本将smth写入集合,下面的脚本监视它,如果满足某些条件,则运行另一个脚本。

主要问题:subprocess.Popen()从multiprocessing.Process()内部使用看起来不自然(仍然工作),所以我试图找到更好或/和更稳定的解决方案:)


监听脚本:

from pymongo import MongoClient, CursorType
from time import sleep
from datetime import datetime

from multiprocessing import Process
import subprocess

def worker_email(keyword):
     subprocess.Popen(["python", "worker_email.py", str(keyword)])

def worker_checker(keyword):
     subprocess.Popen(["python", "worker_checker.py", str(keyword)])

if __name__ == '__main__':

    #DB connect
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    coll = db.my_collection
    cursor = coll.find(cursor_type = CursorType.TAILABLE_AWAIT)

    #Script start UTC time
    utc_run = datetime.utcnow()

    while cursor.alive:
        try:
            doc = cursor.next()
            #Print doc name/args to see in command line, while Listener runs
            print(doc)
            #Filter docs without 'created' data
            if 'created' in doc.keys():
                #Ignore docs older than script
                if doc['created'] > utc_run:
                    #Filter docs without 'type' data
                    if 'type' in doc.keys():
                        #Check type
                        if doc['type'] == 'send_email':
                            #Create process and run external script
                            p = Process(target=worker_email, args=(doc['message'],))
                            p.start()
                            p.join()
                        #Check type
                        elif doc['type'] == 'check_data':
                            #Create process and run external script
                            p = Process(target=worker_checker, args=(doc['message'],))
                            p.start()
                            p.join()
        except StopIteration:
            sleep(1)
答案

只要您可以控制worker_emailworker_checker逻辑,就不需要在单独的解释器中执行。

只需在两个模块中公开一个入口点,然后通过multiprocessing.Process运行它们。

worker_email.朋友

def email_job(message):
    # start processing the message here

worker_checker.朋友

def check_job(message):
    # start checking the message here

listener_script.朋友

# you are not going to pollute the listener namespace
# as the only names you import are the entry points of the scripts
# therefore, encapsulation is preserved
from worker_email import email_job
from worker_checker import check_job

email_process = Process(target=email_job, args=[message])
check_process = Process(target=check_job, args=[message])

如果您无法从工作模块中公开入口点,则只需运行subprocess.Popen。将它们包裹在Process中没有任何好处。

以上是关于为Python脚本创建共享消息流的最佳方法是什么?的主要内容,如果未能解决你的问题,请参考以下文章

将 Python 脚本作为可执行文件的最佳方法?

使用 angularjs 播放 rtmp 流的最佳方法是啥?

创建简单的python Web服务的最佳方法[关闭]

Python 套接字 - 创建消息格式

防止令牌共享的最佳方法是啥?

检查 NFS 共享是不是安装在 python 脚本中