用Nifi 从web api 取数据到HDFS

Posted 疯吻IT

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Nifi 从web api 取数据到HDFS相关的知识,希望对你有一定的参考价值。

1. 全景图

NewImage
 

2. 用ExecuteScript生成动态日期参数

 
为了只生成一个flowfile:
NewImage
 
 
 
Groovy 代码:

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import java.text.SimpleDateFormat;
import java.lang.StringBuilder;
import java.util.Calendar;

def flowFile = session.create()

def days = 10000

flowFile = session.write(flowFile, {inputStream, outputStream ->
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
Calendar cal = Calendar.getInstance();
StringBuilder sb = new StringBuilder();

cal.add(Calendar.DATE,1)

for(int i = 0; i < days; i++) {
  cal.add(Calendar.DATE, -1);
  sb.append(sdf.format(cal.getTime()) + "\\n" );
 }

//println(sb);

outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

//flowFile = session.putAttribute(flowFile, \'filename\', \'get_date\')
session.transfer(flowFile, REL_SUCCESS)

 

3. 用SplitText生成每行一个的日期

Line Split Count    1

 

4. 用ExtractText 取到日期参数

NewImage

 

5. 用UpdateAttribute生成url及filename

NewImage

这里一定要设置filename,不然,所有的文件名都一样,最后只能成功插入一个记录到HDFS。

 

6.  用InvokeHttp获取数据

NewImage

NewImage

 

7. 添加一个 RouteOnContent来过滤空数据

NewImage

 

8. 用PutHDFS把数据插入到HDFS

NewImage

注意这里的Directory 要加上/, 不然就插入到user/root/nifi下了,而不是files下在的nifi了。

 

9. 每天更新数据

NewImage

每天20点更新数据

代码小改下:

def count = 1



 

 

NIFI 中国社区 QQ群:595034369


以上是关于用Nifi 从web api 取数据到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

大数据NiFi(十八):离线同步MySQL数据到HDFS

大数据NiFi(二十二):Kafka中数据实时导入到HDFS中

NIFI同步API接口数据

Nifi01概念

Nifi01概念

Apache Nifi 将文件移动到新的 hdfs 文件夹以获取小于当前日期的文件