为Python脚本创建共享消息流的最佳方法是什么?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为Python脚本创建共享消息流的最佳方法是什么?相关的知识,希望对你有一定的参考价值。
我想做什么:我需要一个简单的消息流,因此一些脚本可以在那里发送结果,另一个脚本可以获取结果并以异步方式执行某些操作。
主要问题:我想看看发生了什么,所以如果出现问题 - 我可以快速解决。我尝试使用Celery + RabbitMQ(可以看到带有args的工人,使用Flower,但是调度过于复杂)和multiprocessing.Queue(简单,但是看不到带有args的工人)。
我做了什么:我尝试使用MongoDB上限集合构建类似的东西,并使用多个进程运行Popen来做出反应。有些脚本将smth写入集合,下面的脚本监视它,如果满足某些条件,则运行另一个脚本。
主要问题:subprocess.Popen()从multiprocessing.Process()内部使用看起来不自然(仍然工作),所以我试图找到更好或/和更稳定的解决方案:)
监听脚本:
from pymongo import MongoClient, CursorType
from time import sleep
from datetime import datetime
from multiprocessing import Process
import subprocess
def worker_email(keyword):
subprocess.Popen(["python", "worker_email.py", str(keyword)])
def worker_checker(keyword):
subprocess.Popen(["python", "worker_checker.py", str(keyword)])
if __name__ == '__main__':
#DB connect
client = MongoClient('mongodb://localhost:27017/')
db = client.admetric
coll = db.my_collection
cursor = coll.find(cursor_type = CursorType.TAILABLE_AWAIT)
#Script start UTC time
utc_run = datetime.utcnow()
while cursor.alive:
try:
doc = cursor.next()
#Print doc name/args to see in command line, while Listener runs
print(doc)
#Filter docs without 'created' data
if 'created' in doc.keys():
#Ignore docs older than script
if doc['created'] > utc_run:
#Filter docs without 'type' data
if 'type' in doc.keys():
#Check type
if doc['type'] == 'send_email':
#Create process and run external script
p = Process(target=worker_email, args=(doc['message'],))
p.start()
p.join()
#Check type
elif doc['type'] == 'check_data':
#Create process and run external script
p = Process(target=worker_checker, args=(doc['message'],))
p.start()
p.join()
except StopIteration:
sleep(1)
答案
只要您可以控制worker_email
和worker_checker
逻辑,就不需要在单独的解释器中执行。
只需在两个模块中公开一个入口点,然后通过multiprocessing.Process
运行它们。
worker_email.朋友
def email_job(message):
# start processing the message here
worker_checker.朋友
def check_job(message):
# start checking the message here
listener_script.朋友
# you are not going to pollute the listener namespace
# as the only names you import are the entry points of the scripts
# therefore, encapsulation is preserved
from worker_email import email_job
from worker_checker import check_job
email_process = Process(target=email_job, args=[message])
check_process = Process(target=check_job, args=[message])
如果您无法从工作模块中公开入口点,则只需运行subprocess.Popen
。将它们包裹在Process
中没有任何好处。
以上是关于为Python脚本创建共享消息流的最佳方法是什么?的主要内容,如果未能解决你的问题,请参考以下文章