nifi生产环境使用

Posted muzhongjiang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nifi生产环境使用相关的知识,希望对你有一定的参考价值。

Nifi生产环境使用

 

1、服务器日志目录内的 log 文件中,我们使用 Apache Flume 这个工具将原始数据抽取出来 kafka sink ,

2、Nifi接入kafka数据。

首先做验证,然后过滤格式错误记录,然后路由不同的日志类型. nifi能做到这些的关键在于它的 flowfile 这个概念. 每一条数据记录进入到nifi中就叫flowfile. 每一个flowfile 由两部分组成,一个是content, 文件内容. 一个是 attributes ,文件属性.  在 nifi 中, 我们可以对文件属性进行增删改查等操作,甚至我们可以使用 nifi 提供的DSL,特定领域语言 对 attributes 进行编程. 这样的设定使得可以对数据记录进行任何想要的逻辑处理. 所以,一般是先把日志记录的内容转换到 flow file 的属性值当中去,然后进行后续的不同处理.  如下图:

技术图片

 

接着继续判断,对于通过网络获取手机号码成功的日志,将原始的日志记录保存到 hbase 中,之后供业务方做即席查询. 如下图:

技术图片

 

 对于获取手机号码失败的日志,手动去查用户的地址和所属运营商信息.这里是强业务相关的,因为属于其他运营商,所以是获取不到号码的.这里的处理真正体现了nifi的强大和灵活.

因为对于失败的日志,实际上是缺少必备字段的.缺少了字段, 这在日志文件批量处理中是多么坑爹的事情.然而使用 nifi 却能很轻松的解决这个问题.直接拿这条记录的用户ip 去调 一个内部的服务化查询接口,把字段查出来.并把值赋给flow file 相应的attribute. 把这样的日志记录变成正常的日志记录后,再汇入到处理的主流程当中,接着流动下去.

 

技术图片

 

技术图片

 

 接下的处理流程主要就是分发及转存了.对于同一条日志记录,一份数据要提取字段,将之转成 hive表结构对应的 csv 格式 ,保存到 hdfs 中, 也就是将所有处理好的数据落地到hdp集群环境中去.这是数据清洗后一大终点,也是结果.之后可以拿来直接做其他处理了,比如做批量查询,供其他工具使用(比如 kylin 等),也可以用来做模型训练.因为这里就算是干净的数据了.

 

技术图片

 

另一份数据(假定历史数据都已经处理完成,现在都是实时的数据)要提取业务需要的字段,导入到kafka的topic 中. 供业务需要的即时统计分析使用.

 

技术图片

 

至此,整个 nifi 的数据处理流程都走完了. 归功了 nifi 的强大 ,数据从起点到终点,虽然处理流程多,但流向非常清晰.用 nifi 拖拽几下,一套特定业务日志的处理系统就完成了. 右键点击 start ,系统就跑起来,你可以在界面看到数据的流动,可以监控,可以暂停,可以调试某段,可以查看中间结果.这些都是可以在界面上完成的.  用很简单的使用方式,去做很复杂的事情,最牛逼的工具莫过于此了.当然, nifi 也有很多高级的用法. 甚至可以 搭一个 nifi 的集群 ,来处理更加海量的数据,这里就不细说了.

当然,nifi中一个很长的数据处理流程是需要花时间观察,调试及验证的.从一个处理器到另一个处理器是否通畅.数据是否阻塞在了流程图中的某一段等问题.需要调试单个处理器的并发量及run schedule等信息.以及处理器之间的缓存队列容量和大小等信息.这些需要耐心的调试,就像一个流动的人群一样,慢慢疏导.

 

最后,附上实时数据统计的代码,这是这个项目写的唯一代码了,使用的是 spark 技术栈 ,spark streaming 和 spark sql . 关键是这个group by 函数.把聚合后的数据保存mysql 里面,供业务应用查询.这里后来把时间窗口改为了一小时,因为当时聚合后的数据量也还蛮大,2天存了35万条到mysql里面,这样的量放mysql 里面不太合适.

可能一开始的方案就不正确,后来也没继续跟进了.

技术图片

 

以上,就是这个项目的一个完整记录,做得有点粗糙,最后也没有继续跟进.很多细节没有考虑到 ,比如由于网络的问题,在整个流程中,是否会有数据的丢失?当由于某种原因导致流程中断,是否会有数据重复.最后的结果,怎样去校验数据的完整性.这些问题理论上是聚焦于各个组件,比如sqoop, kafka ,nifi的机制上的,但实际跑起来之后,会有什么样的问题.当时并没有过细的研究.

 

以上是关于nifi生产环境使用的主要内容,如果未能解决你的问题,请参考以下文章

NiFi 1.16.3 生产使用的更新及BUG。

NiFi 1.15.3↑ 集群HTTP搭建

NiFi 1.15.3↑ 集群HTTP搭建

大数据NiFi(二十一):监控日志文件生产到Kafka

生产环境为啥尽量不用join

生产环境,测试环境中,Docker 可以做啥