如何从脚本中获得戏剧工作者的工作?

Posted

技术标签:

【中文标题】如何从脚本中获得戏剧工作者的工作?【英文标题】:How to get a dramatiq worker work from within a script? 【发布时间】:2021-08-18 04:24:39 【问题描述】:

我是 Dramatiq 的新手,实验了几天,我没有让工人在(简单的)脚本中工作。

test.py


import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
from dramatiq.worker import Worker

redis_broker = RedisBroker(host="127.0.0.1", port=6379)

results_backend = RedisBackend(url="redis://127.0.0.1:6379")
redis_broker.add_middleware(Results(backend=results_backend))

dramatiq.set_broker(redis_broker)

worker = Worker(
    broker=redis_broker
)
worker.start()

@dramatiq.actor(queue_name="default", max_retries=1, store_results=True)
def print_hello_world():
    print("Hello World!")

print_hello_world.send()

结果(Redis):

127.0.0.1:6379> keys *
1) "dramatiq:default.msgs"
2) "dramatiq:default"
3) "dramatiq:__heartbeats__"

但是当从带有 test.py 的文件夹中启动一个 Dramatiq 守护进程时:

$ dramatiq test

结果如我所愿

结果(Redis)

127.0.0.1:6379> keys *
1) "3f11649148820d957b2945da46b3c2b7"
2) "dramatiq:__heartbeats__"

工人似乎没有以这种方式接收消息。在互联网上很难找到从脚本中设置工作人员的示例。

有没有人可以帮助我完成这项工作?

【问题讨论】:

【参考方案1】:

你可以参考这个链接https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311

"""
Refer from https://gitlab.com/bersace/flask-dramatiq/-/blob/master/flask_dramatiq.py#L311
"""
import os
import sys

from dramatiq.cli import (
    CPUS,
    HAS_WATCHDOG,
    main as dramatiq_worker,
    make_argument_parser as dramatiq_argument_parser,
)

from app.core.config import get_settings
from app.tasks import rabbitmq_broker
from app.tasks.manual_selenium import demo
from app.utils.logger import logger


def list_managed_actors(broker, queues):
    queues = set(queues)
    all_actors = broker.actors.values()
    if not queues:
        return all_actors
    else:
        return [a for a in all_actors if a.queue_name in queues]


def format_actor(actor):
    return "%s@%s" % (actor.actor_name, actor.queue_name)


def guess_code_directory(broker):
    actor = next(iter(broker.actors.values()))
    modname, *_ = actor.fn.__module__.partition('.')
    mod = sys.modules[modname]
    return os.path.dirname(mod.__file__)


def worker(verbose=0, processes=CPUS, threads=8, queues=None, broker=rabbitmq_broker):
    """Run dramatiq workers.

    Setup Dramatiq with broker and task modules from Flask app.

    \b
    examples:
      # Run dramatiq with 1 thread per process.
      $ flask worker --threads 1

    \b
      # Listen only to the "foo" and "bar" queues.
      $ flask worker -Q foo,bar

    \b
      # Consuming from a specific broker
      $ flask worker mybroker
    """
    # Plugin for flask.commands entrypoint.
    #
    # Wraps dramatiq worker CLI in a Flask command. This is private API of
    # dramatiq.
    # TODO Plugin for fastapi

    parser = dramatiq_argument_parser()

    command = [
        "--processes", str(processes),
        "--threads", str(threads),
        # This module does not have broker local. Thus dramatiq fallbacks to
        # global broker.
        __name__,
    ]

    if get_settings().DEBUG:
        verbose = max(1, verbose)
        if HAS_WATCHDOG:
            command += ["--watch", guess_code_directory(broker)]

    queues = queues.split(",") if queues else []
    if queues:
        command += ["--queues"] + queues
    command += verbose * ['-v']
    args = parser.parse_args(command)

    logger.info("Able to execute the following actors:")
    for actor in list_managed_actors(broker, queues):
        logger.info("\t%s." % format_actor(actor))

    dramatiq_worker(args)


if '__main__' == __name__:
    worker()


【讨论】:

以上是关于如何从脚本中获得戏剧工作者的工作?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用用户脚本加载共享网络工作者?

将工作区从作者实例发布到公共

Chrome 扩展后台脚本如何与网页服务工作者通信?

自动填充从工作表到文档的日期并获得不同的时区(谷歌脚本)

脚本和剧本有啥区别,脚本怎么写

构建之法阅读笔记05