如何使用 ExecuteScript 和 python 从 nifi 中的一个传入流文件创建多个流文件

Posted

技术标签:

【中文标题】如何使用 ExecuteScript 和 python 从 nifi 中的一个传入流文件创建多个流文件【英文标题】:How to create multiple flow files from one incoming flow files in nifi using ExecuteScript with python 【发布时间】:2021-02-03 14:38:25 【问题描述】:

在本地运行,这完全符合我的要求(在位置 7-10 有一个带有许多不同代码的传入流文件,并且每个唯一代码输出 1 个文件)例如,如果记录 1-5 在位置 7- 有 1234 10,记录 6 在位置 7-10 有 2345,记录 7 在位置 7-10 有 1234,然后会有一个名为 1234_file.txt 的文件,其中第 1-5 行和第 7 行,第二个文件 2345_file.txt 将有输入文件的第 6 行:

f = open("test_comp.txt", "r")
for x in f:
    comp = x[6:10]
    print(comp)
    n = open(comp+"_file.txt","a")
    n.write(x)
    n.close()
f.close()

在 nifi 中,我正在尝试这个:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    f = open(inputStream, 'r')
    for x in f:
        comp = x[6:10]
        print("comp: ",comp)
        newFile = open(comp+"_file.txt","a")
        newFile.write(x)


flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)
session.commit()

似乎正在获取输入并将 comp 正确存储为预期的位置 7-10,但我没有得到多个流文件(对于 x[6:10] 中的每个唯一字符串。流文件即将到来out 是 1 个零字节文件。

对我缺少什么有什么想法吗??

【问题讨论】:

【参考方案1】:

您正在直接写入文件系统上的文件,而不是 flowfiles,后者是 NiFi 生态系统中的对象。我建议阅读 Apache NiFi Developer's Guide 以了解这些模式的上下文,并查看一些 Python ExecuteScript examples 以查看相关的 Python 代码。

您需要创建多个流文件对象,将数据映射到它们,然后将它们全部转移到各自的关系中,而不是写出单个流文件。

您是否有理由需要使用自定义 Python 代码而不是 SplitRecord 和/或 PartitionRecord 处理器来执行此操作?我认为PartitionRecord 可以很容易地解决您描述的问题。

【讨论】:

谢谢,我会阅读您推荐的文章。至于PartitionRecord,由于我的文件只是一个平面文件(不是csv),什么控制器服务可以工作? 你能提供一些示例行吗? CSVReader 可以配置为解析许多分隔文件,即使它们不是逗号、制表符或管道 我同意马特的观点,如果传入的数据是“扁平的”,CSVReader 可能是使用可配置分隔符的最佳解决方案。如果 OOTB 控制器服务都不适合您,还有一个 Scripted 选项。 470120123829 2000004590 00000051 3212 057 YVKB 20125 00 999999999999901 HG04070316 123主要街道,US 99999 470220123729 2000091230 00000051 2012 056 Y2HF 27258 00 999999999999901 HG04070316 123主要街道,US 99999 跨度> 有两个示例行。这两个都打算在不同的流文件中结束,因为一个在位置 7-10 有 1238,另一个在位置 7-10 有 1237。

以上是关于如何使用 ExecuteScript 和 python 从 nifi 中的一个传入流文件创建多个流文件的主要内容,如果未能解决你的问题,请参考以下文章

chrome.tabs.executeScript():如何获取内容脚本的结果?

WebDriver executeAsyncScript 与 executeScript

通过 Selenium WebDriver 从 JavascriptExecutor 接口使用 executeScript 方法时,arguments[0] 和 arguments[1] 是啥意思?

为啥我不能在我的 selenium 脚本中使用 executeScript?

InAppBrowser 注入脚本(使用 executeScript)

求助:CATScriptUtilities::ExecuteScript 的用法