如何在Apache NIFI中应用机器学习来处理流数据?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Apache NIFI中应用机器学习来处理流数据?相关的知识,希望对你有一定的参考价值。

我有一个处理器,可以生成JSON格式的时间序列数据。根据接收到的数据,我需要在python上使用机器学习算法进行预测。然后将新的预测值写入另一个流程文件中。

问题是:当你运行这样一个python脚本时,它必须执行许多大规模的预处理操作:对数据库进行查询、创建复杂的数据结构、初始化预测模型等。

如果你使用ExecuteStreamCommand,那么对于每个流文件,脚本每次都会被运行。这是真的吗?

我是否可以在NIFI中制作一个python脚本,一次启动,多次接收流文件,存储之前接收的数据历史。还是我需要做一个HTTP服务,从NIFI中接收数据?

答案

你有几个选择。

  1. 构建一个自定义的处理器. 这是我建议的方法。代码需要用Java(或Groovy,它提供了更类似Python的体验),但不会有Python的依赖性等。然而,我已经看到了这种方法在ML模型应用中的例子(见 Tim Spann的例子),这一般来说是非常有效的。初始化和各个流文件触发逻辑干净利落地分离,性能良好。
  2. 使用 InvokeScriptedProcessor. 这将使你能够用Python和 分离初始化 (预处理、DB连接等。onScheduled 用NiFi处理器的说法)与执行阶段(onTrigger). 一些 例子 存在,但我个人并没有专门用Python去研究这个问题。你可以使用Python依赖,但不能使用 "原生模块"(即编译的C代码),因为执行引擎仍然是Jython。
  3. 使用 ExecuteStreamCommand. 不强烈建议。正如你所提到的,每次调用都需要预处理步骤,除非你在设计外部应用程序时,让它运行一个寿命很长的 "服务器 "组件,并且每个 ESC 命令向它发送数据并返回一个单独的响应。我不知道你现有的Python应用是什么样子的,但这很可能会涉及到复杂的改动。Tim 有另一个例子 使用CDSW来托管和部署模型,NiFi通过HTTP向其发送数据进行评估。

以上是关于如何在Apache NIFI中应用机器学习来处理流数据?的主要内容,如果未能解决你的问题,请参考以下文章

Apache NiFi 中处理器属性和流文件属性之间的区别

Apache NiFi 实例挂在“计算流文件沿袭...”窗口

如何在 Apache NiFi 中管理、排序和过滤大量流?

Apache NiFi - 数据网络服务

apache nifi 总执行时间

如何在 Apache Nifi 中合并分区的 Json