如何从脚本中获得戏剧工作者的工作?
Posted
技术标签:
【中文标题】如何从脚本中获得戏剧工作者的工作?【英文标题】:How to get a dramatiq worker work from within a script? 【发布时间】:2021-08-18 04:24:39 【问题描述】:我是 Dramatiq 的新手,实验了几天,我没有让工人在(简单的)脚本中工作。
test.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
from dramatiq.worker import Worker
redis_broker = RedisBroker(host="127.0.0.1", port=6379)
results_backend = RedisBackend(url="redis://127.0.0.1:6379")
redis_broker.add_middleware(Results(backend=results_backend))
dramatiq.set_broker(redis_broker)
worker = Worker(
broker=redis_broker
)
worker.start()
@dramatiq.actor(queue_name="default", max_retries=1, store_results=True)
def print_hello_world():
print("Hello World!")
print_hello_world.send()
结果(Redis):
127.0.0.1:6379> keys *
1) "dramatiq:default.msgs"
2) "dramatiq:default"
3) "dramatiq:__heartbeats__"
但是当从带有 test.py 的文件夹中启动一个 Dramatiq 守护进程时:
$ dramatiq test
结果如我所愿
结果(Redis)
127.0.0.1:6379> keys *
1) "3f11649148820d957b2945da46b3c2b7"
2) "dramatiq:__heartbeats__"
工人似乎没有以这种方式接收消息。在互联网上很难找到从脚本中设置工作人员的示例。
有没有人可以帮助我完成这项工作?
【问题讨论】:
【参考方案1】:你可以参考这个链接https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
Refer from https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
import os
import sys
from dramatiq.cli import (
CPUS,
HAS_WATCHDOG,
main as dramatiq_worker,
make_argument_parser as dramatiq_argument_parser,
)
from app.core.config import get_settings
from app.tasks import rabbitmq_broker
from app.tasks.manual_selenium import demo
from app.utils.logger import logger
def list_managed_actors(broker, queues):
queues = set(queues)
all_actors = broker.actors.values()
if not queues:
return all_actors
else:
return [a for a in all_actors if a.queue_name in queues]
def format_actor(actor):
return "%s@%s" % (actor.actor_name, actor.queue_name)
def guess_code_directory(broker):
actor = next(iter(broker.actors.values()))
modname, *_ = actor.fn.__module__.partition('.')
mod = sys.modules[modname]
return os.path.dirname(mod.__file__)
def worker(verbose=0, processes=CPUS, threads=8, queues=None, broker=rabbitmq_broker):
"""Run dramatiq workers.
Setup Dramatiq with broker and task modules from Flask app.
\b
examples:
# Run dramatiq with 1 thread per process.
$ flask worker --threads 1
\b
# Listen only to the "foo" and "bar" queues.
$ flask worker -Q foo,bar
\b
# Consuming from a specific broker
$ flask worker mybroker
"""
# Plugin for flask.commands entrypoint.
#
# Wraps dramatiq worker CLI in a Flask command. This is private API of
# dramatiq.
# TODO Plugin for fastapi
parser = dramatiq_argument_parser()
command = [
"--processes", str(processes),
"--threads", str(threads),
# This module does not have broker local. Thus dramatiq fallbacks to
# global broker.
__name__,
]
if get_settings().DEBUG:
verbose = max(1, verbose)
if HAS_WATCHDOG:
command += ["--watch", guess_code_directory(broker)]
queues = queues.split(",") if queues else []
if queues:
command += ["--queues"] + queues
command += verbose * ['-v']
args = parser.parse_args(command)
logger.info("Able to execute the following actors:")
for actor in list_managed_actors(broker, queues):
logger.info("\t%s." % format_actor(actor))
dramatiq_worker(args)
if '__main__' == __name__:
worker()
【讨论】:
以上是关于如何从脚本中获得戏剧工作者的工作?的主要内容,如果未能解决你的问题,请参考以下文章