Networkx 作为任务队列?

Posted

技术标签:

【中文标题】Networkx 作为任务队列?【英文标题】:Networkx as a task queue? 【发布时间】:2016-07-06 21:41:29 【问题描述】:

我在networkx 中有一个有向无环图。每个节点代表一个任务,节点的前任是任务依赖项(给定任务在其依赖项执行之前无法执行)。

我想在异步任务队列中“执行”图表,类似于 celery 提供的内容(以便我可以轮询作业的状态、检索结果等)。 Celery 不提供创建 DAG 的能力(据我所知),并且在所有依赖项完成后能够继续使用 task 将是至关重要的(DAG 可能有多个路径,即使一个任务是缓慢/阻塞,可能会继续执行其他任务等)。

是否有任何简单的示例说明我如何实现这一点,或者甚至可以将networkxcelery 集成?

【问题讨论】:

您可能正在搜索的内容称为 dask:dask.pydata.org/en/latest/custom-graphs.html?highlight=graph 【参考方案1】:

我认为这个功能可能会有所帮助:

  # The graph G is represened by a dictionnary following this pattern:
  # G =  vertex: [ (successor1: weight1), (successor2: weight2),...   ]  
  def progress ( G, start ):
     Q = [ start ] # contain tasks to execute
     done = [ ]    # contain executed tasks
     while len (Q) > 0: # still there tasks to execute ?
        task = Q.pop(0) # pick up the oldest one 
        ready = True
        for T in G:     # make sure all predecessors are executed
           for S, w in G[T]:
              if S == task and and S not in done:# found not executed predecessor 
                 ready = False
                 break
           if not ready : break
        if not ready:
           Q.appen(task) # the task is not ready for execution
        else:
           execute(task)
           done.appen(task) # execute the task
           for S, w in G[task]:# and explore all its successors
              Q.append(S)

【讨论】:

你永远不会执行任何任务。【参考方案2】:

您可以为此使用的一个库是taskgraph。它允许您定义任务图,然后以多线程/多进程的方式执行这些任务。它避免重新运行结果已经是最新的任务,类似于 make 程序。

要执行您的 networkx 图,您将迭代 topological order 中的所有节点,收集每个节点的即时依赖项,然后调用 task_graph.add_task。该函数将返回一个新添加任务的句柄,让您可以将其用作后续添加任务的依赖项(这就是节点迭代顺序很重要的原因)

有关替代解决方案,另请参阅this question。

【讨论】:

【参考方案3】:

我参加聚会有点晚了,但一种可能性是使用 dask 构建自定义 DAG,然后执行它们,请参阅 https://docs.dask.org/en/stable/graphs.html。

【讨论】:

以上是关于Networkx 作为任务队列?的主要内容,如果未能解决你的问题,请参考以下文章

Redis入门 - 消息通知

RabbitMQ使用场景_002_工作队列

UCOSIII消息队列

UCOSIII消息队列

Apache Kafka 是不是适合用作无序任务队列?

Celery 超时后没有将任务放回 RabbitMQ 队列