使用python传输文件以执行脚本NiFi

Posted

技术标签:

【中文标题】使用python传输文件以执行脚本NiFi【英文标题】:Transfer File to executescript NiFi using python 【发布时间】:2020-09-12 17:54:38 【问题描述】:

我正在尝试使用 NiFi ExecuteScript 将消息发布到使用 Python 的 irc 聊天室。我是否使用正确的语法将流文件从队列传递到处理器? NameError 导致处理器错误:未定义全局名称“服务器”,但我不确定是什么原因造成的。在我添加 session.get() 之前,一切似乎都有效。

import socket
from org.apache.nifi.processor.io import StreamCallback, InputStreamCallback

class PyStreamCallback(InputStreamCallback):
    def __init__(self):
       pass
       self.ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       self.server = "irc.freenode.net"
       self.channel = ""
       self.botnick = ""
       self.ircsock.connect((server, 6667))
       self.ircsock.send(bytes("USER "+ botnick +" "+ botnick +" "+ botnick + " " + botnick + "\n"))
       self.ircsock.send(bytes("NICK "+ botnick +"\n"))

    def joinchan(self, chan):
      self.ircsock.send(bytes("JOIN "+ chan +"\n"))
      ircmsg = ""
      while ircmsg.find("End of /NAMES list.") == -1:
        self.ircmsg = ircsock.recv(2048)
        self.ircmsg = ircmsg.strip('\n\r')
        print(ircmsg)

    def sendmsg(self, msg, target=channel):
      self.ircsock.send(bytes("PRIVMSG "+ target +" :"+ msg +"\n"))



flowFile = session.get()
if (flowFile != None):
  flowFile = session.read(flowFile,PyStreamCallback())
session.commit()

【问题讨论】:

【参考方案1】:
self.ircsock.connect((server, 6667))

应该是

self.ircsock.connect((self.server, 6667))

【讨论】:

队列中的流文件仍未正确传输。我是否正确读取流文件? 抱歉,不知道流文件​​或会话。但是“flowFile = session.read(flowFile,PyStreamCallback())”这一行可能需要修改为“session.read(flowFile,PyStreamCallback())”。不要将 flowfile 设置为 session.read 的返回值。

以上是关于使用python传输文件以执行脚本NiFi的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Nifi 的虚拟环境中运行具有依赖关系的 python 脚本?

如何在Apache NIFI中应用机器学习来处理流数据?

expect批量同步或执行命令工具

NiFi-1.0.0 - 加载 lua 脚本

python 从NiFi ExecuteScript处理器使用的Python脚本示例,它从传入的流文件中读取第一行。

怎么执行一个自己写的脚本文件