如何使用 ExecuteScript(以 python 作为脚本引擎)进行加数练习? 【新手尝试学习NiFi】
Posted
技术标签:
【中文标题】如何使用 ExecuteScript(以 python 作为脚本引擎)进行加数练习? 【新手尝试学习NiFi】【英文标题】:How to use ExecuteScript (with python as a script engine) for an exercise to add numbers? [Novice user trying to learn NiFi] 【发布时间】:2018-12-28 00:46:39 【问题描述】:我对 NiFi 比较陌生,不确定如何正确执行以下操作。我想使用ExecuteScript
处理器(脚本引擎:python)执行以下操作(请仅在 python 中):
1)有一个CSV文件包含以下信息(第一行是标题):
first,second,third
1,4,9
7,5,2
3,8,7
2) 我想找到各个行的总和并生成一个带有修改后的标题的最终文件。最终文件应如下所示:
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18
对于python脚本,我写了:
def summation(first,second,third):
numbers = first + second + third
return numbers
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, summation())
但它不起作用,我不知道如何解决这个问题。谁能告诉我如何解决这个问题?
NiFi 流程:
谢谢
【问题讨论】:
【参考方案1】:您的脚本没有按照您的意愿执行。有几种方法可以解决这个问题:
-
使用迭代 CSV 内容中的行的脚本一次对整个流文件进行操作
将 CSV 内容中的行视为“记录”,并使用处理单行的脚本对每条记录进行操作
我将对您的脚本进行更改,以便一次处理整个流程文件内容;您可以阅读有关Record*
处理器here、here 和here 的更多信息。
这是一个执行您期望的操作的脚本。请注意差异以查看我在哪里进行了更改(这个脚本当然可以变得更加高效和简洁;演示正在发生的事情很冗长,而且我不是 Python 专家)。
import json
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import StreamCallback
# This PyStreamCallback class is what the processor will use to ingest and output the flowfile content
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
# Get the provided inputStream into a format where you can read lines
reader = BufferedReader(InputStreamReader(inputStream))
# Set a marker for the first line to be the header
isHeader = True
try:
# A holding variable for the lines
lines = []
# Loop indefinitely
while True:
# Get the next line
line = reader.readLine()
# If there is no more content, break out of the loop
if line is None:
break
# If this is the first line, add the new column
if isHeader:
header = line + ",total"
# Write the header line and the new column
lines.append(header)
# Set the header flag to false now that it has been processed
isHeader = False
else:
# Split the line (a string) into individual elements by the ',' delimiter
elements = self.extract_elements(line)
# Get the sum (this method is unnecessary but shows where your "summation" method would go)
sum = self.summation(elements)
# Write the output of this line
newLine = ",".join([line, str(sum)])
lines.append(newLine)
# Now out of the loop, write the output to the outputStream
output = "\n".join([str(l) for l in lines])
outputStream.write(bytearray(output.encode('utf-8')))
finally:
if reader is not None:
reader.close()
except Exception as e:
log.warn("Exception in Reader")
log.warn('-' * 60)
log.warn(str(e))
log.warn('-' * 60)
raise e
session.transfer(flowFile, REL_FAILURE)
def extract_elements(self, line):
# This splits the line on the ',' delimiter and converts each element to an integer, and puts them in a list
return [int(x) for x in line.split(',')]
# This method replaces your "summation" method and can accept any number of inputs, not just 3
def summation(self, list):
# This returns the sum of all items in the list
return sum(list)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
我的流程的结果(使用您在 GenerateFlowFile
处理器中的输入):
2018-07-20 13:54:06,772 INFO [Timer-Driven Process Thread-5] o.a.n.processors.standard.LogAttribute LogAttribute[id=b87f0c01-0164-1000-920e-799647cb9b48] logging for flow file StandardFlowFileRecord[uuid=de888571-2947-4ae1-b646-09e61c85538b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1532106928567-1, container=default, section=1], offset=2499, length=51],offset=0,name=470063203212609,size=51]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'lineageStartDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'fileSize'
Value: '51'
FlowFile Attribute Map Content
Key: 'filename'
Value: '470063203212609'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'de888571-2947-4ae1-b646-09e61c85538b'
--------------------------------------------------
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18
【讨论】:
以上是关于如何使用 ExecuteScript(以 python 作为脚本引擎)进行加数练习? 【新手尝试学习NiFi】的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 ExecuteScript 和 python 从 nifi 中的一个传入流文件创建多个流文件
WebDriver executeAsyncScript 与 executeScript
为啥我不能在我的 selenium 脚本中使用 executeScript?
InAppBrowser 注入脚本(使用 executeScript)