可以在 Apache Apex 中的 DAG 中间使用输入运算符吗

Posted

技术标签:

【中文标题】可以在 Apache Apex 中的 DAG 中间使用输入运算符吗【英文标题】:Can an Input Operator be used in the middle of a DAG in Apache Apex 【发布时间】:2017-02-28 07:40:20 【问题描述】:

Apex 的所有示例都表明 DAG 的第一个运算符应该是输入运算符。这个算子能否出现在 DAG 中间某处。

考虑这样一种情况,其中我有数据要从数据库中获取,基于刚刚由前一个运算符处理的一些数据,这意味着输入运算符将出现在 DAG 的中间某处。

根据输入运算符的定义,它是一个没有任何输入流的运算符。但如果使用连接器,它也可以完成获取数据的工作。那么,如果我在 DAG 之间的某个地方获取数据,它会起作用吗?

【问题讨论】:

【参考方案1】:

这是一个有趣的用例。您应该能够扩展输入运算符(例如 JdbcInputOperator,因为您想从数据库中读取)并向其添加输入端口。此输入端口从您的 DAG 接收来自另一个运算符的数据(元组)并更新 JdbcInputOperator 的“where”子句,以便它根据该子句读取数据。希望这就是您想要的。

【讨论】:

嗨 Sanjay,考虑到 InputOperator 和通用运算符的处理方式不同,这实际上是否可行。 apex.apache.org/docs/apex/operator_development/… 我刚看到弗拉德的回答。我在之前评论中的查询已得到澄清。【参考方案2】:

是的,这是可能的。您可以扩展现有的 InputOperator 并向其添加 InputPort(s)。在这种情况下,Apex 平台会将您的操作员作为通用操作员处理,而不是调用InputOperator.emitTuples()。调用super.emitTuples() 或直接在输出端口上发射将是您的扩展操作员责任。

【讨论】:

【参考方案3】:

不,输入运算符不能在 DAG 之间使用。 正如您已经指出的那样,由于没有输入流,您将无法从先前的运算符获取数据以用于此运算符。

对于您指出的示例,最好使用输入流编写您自己的通用运算符,该输入流实际上具有与输入运算符相似的功能,其中它可以根据输入流中的数据从外部源读取数据.

另外,还有一点需要注意: 如果查询太繁重,最好有一个异步线程来查询数据库。该线程可以将数据写入队列,主线程可以从中读取记录并将它们发送到输出流上。这样可以保证主算子线程不被阻塞,算子不会失败。

【讨论】:

以上是关于可以在 Apache Apex 中的 DAG 中间使用输入运算符吗的主要内容,如果未能解决你的问题,请参考以下文章

如何在 DAG 中使用 Apache Apex Malhar RabbitMQ 运算符

如何从 Apache Apex 应用程序内部获取 ApplicationID?

将输入运算符动态添加到正在运行的 Apache Apex 应用程序

IBM Websphere MQ 到 Apache Apex Operator Stream?

DAG 如何让 Apache Spark 容错?

如何将参数传递给 Apache Apex 中的 application.java 类?