用Nifi合并二个API计算并生成新的API

Posted 疯吻IT

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Nifi合并二个API计算并生成新的API相关的知识,希望对你有一定的参考价值。

1. 全景图

NewImage

NewImage

NewImage

 

2. 合并

根据attribute合并flowfile:

NewImage

 

合并 json, 并增加code,message等:

NewImage

 

3. 计算方差:

在ExecuteScript里只能用纯python, 很多第三方包都不能用;并把计算的值插入到json里,输出。

 

import simplejson as json 
#from scipy.stats import f_oneway
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback


class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    jsonData = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    data = json.loads(jsonData)
    values = [float(i[\'fltValue\']) for i in data["data"]]
    firsts = [float(i[\'first\']) for i in data["data"]]
    seconds = [float(i[\'second\']) for i in data["data"]]

    def stdDeviation(a):
        count = len(a)
        if count < 2: return 0
        avg = sum(a)/count
        result = 0.0
        for i in a: result += (i - avg)**2
        return (result/(count - 1))**0.5


    v = stdDeviation(values)
    f = stdDeviation(firsts)
    s = stdDeviation(seconds)
    
    data["valueDev"] = v
    data["firstDev"] = f
    data["secondDev"] = s
              
    outputStream.write(bytearray(json.dumps(data, indent=4).encode(\'utf-8\')))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  session.transfer(flowFile, REL_SUCCESS)

 

4. 最终效果:

第一个API:

NewImage

第二个API:

NewImage

 

最后合并生成的API:

NewImage

NewImage

可视化图:

NewImage

 

 

NIFI 中国社区 QQ群:595034369

以上是关于用Nifi合并二个API计算并生成新的API的主要内容,如果未能解决你的问题,请参考以下文章

NIFI同步API接口数据

Nifi在HDFS路径上移动前一天合并的json文件的文件

带有物联网传感器的 Apache Nifi

NiFi源码整理

智能生成并盘活API研发资产

axios 调用从第一个 api 获取响应并传递到第二个 api