会话未在nifi自定义处理器内关闭异常

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了会话未在nifi自定义处理器内关闭异常相关的知识,希望对你有一定的参考价值。

想要获取文件并使用它的xml内容,然后更新其中一个标签,但更明显错误ffstream = session(flowfile)后来关闭ffstream.close()实际上没有关闭并抛出异常,这里是我的代码我应该改变什么?

flowFile = session.putAttribute(flowFile,"filename",file.getName() + ".xml");
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse(ffStream);
ffStream.close();
XPath xPath = XPathFactory.newInstance().newXPath();
NodeList myNodeList = (NodeList) xPath.compile("//runAs/text()")
    .evaluate(flowFile, XPathConstants.NODESET);myNodeList.item(0).setNodeValue("false");
FlowFile flowFile2=session.create();
flowFile = session.putAttribute(flowFile2,"filename","config" + ".xml");
session.write(flowFile2, new OutputStreamCallback() 
    @Override
    public void process(final OutputStream out) throws IOException 
        TransformerFactory transformerFactory = TransformerFactory.newInstance();
        Transformer transformer = null;
        try 
            transformer = transformerFactory.newTransformer();
         catch (TransformerConfigurationException e) 
            e.printStackTrace();
        
        DOMSource source = new DOMSource(xmlDocument);
        ByteArrayOutputStream bos=new ByteArrayOutputStream();
        StreamResult result=new StreamResult(bos);
        try 
            transformer.transform(source, result);
         catch (TransformerException e) 
            e.printStackTrace();
        
        byte []array=bos.toByteArray();
        out.write(array);
    
);
答案

您的代码没有显示ffStream的创建或初始化,您使用的是ffStream = session.read(flowFile)吗?如果是这样,一旦调用putAttribute(),您为之前版本的流文件打开的输入流就不再有效,因为您通过调用putAttribute()创建了一个较新的版本。

之后你也有一些问题。例如,您最终创建了一个新的流文件,但是在session.write()之后使用原始流文件的指针/变量来引用它。这肯定会引起问题。如果要覆盖传入的流文件,请将session.write()与StreamCallback而不是OutputStreamCallback一起使用。如果要保留传入的流文件并创建一个新文件,我建议为每个流文件(flowFile和flowFile2)维护一个变量,并在每次更新该文件时更新该变量(通过session.write,session.putAttribute,等等。)

另一答案

我已经删除了.putAttribute(),并在session.write()函数中读取xml文档后关闭了ffstream会话,并且我工作得很好。

InputStream ffStream=session.read(flowFile);
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse(ffStream);

XPath xPath = XPathFactory.newInstance().newXPath();
XPathExpression myNodeList = (XPathExpression) xPath.compile("/localAttributes");

Node nodeGettingChanged = (Node) myNodeList.evaluate(xmlDocument, XPathConstants.NODE);
NodeList childNodes = nodeGettingChanged.getChildNodes();
for (int i = 0; i != childNodes.getLength(); ++i)

    Node child = childNodes.item(i);
    if (!(child instanceof Element))
        continue;

    if (child.getNodeName().equals("runAs"))
        child.getFirstChild().setNodeValue("false") ;


session.write(flowFile, new StreamCallback() 
    @Override
    public void process(InputStream inputStream, OutputStream outputStream) throws IOException 
        TransformerFactory transformerFactory = TransformerFactory.newInstance();
        Transformer transformer = null;
        try 
            transformer = transformerFactory.newTransformer();
         catch (TransformerConfigurationException e) 
            e.printStackTrace();
        
        DOMSource source = new DOMSource(xmlDocument);
        ffStream.close();
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        StreamResult result = new StreamResult(bos);
        try 
            transformer.transform(source, result);
         catch (TransformerException e) 
            e.printStackTrace();
        
        byte[] array = bos.toByteArray();
        outputStream.write(array);
    
);

以上是关于会话未在nifi自定义处理器内关闭异常的主要内容,如果未能解决你的问题,请参考以下文章

Airbnb Airflow vs Apache Nifi [关闭]

Django - 异常处理最佳实践和发送自定义错误消息 [关闭]

大数据NiFi:NiFi Processors(处理器)

大数据NiFi:NiFi Processors(处理器)

Nifi自定义processor

DRF框架中的异常处理程序