Flume高级组件之Source Interceptors及项目实践

Posted ccql

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume高级组件之Source Interceptors及项目实践相关的知识,希望对你有一定的参考价值。

文章目录

1. 写在前面

       Flume的核心组件包括:Source、Channel和Sink;高级组件包括Source Interceptors、Channel Selectors和Sink Processors,具体如下:

  • Source Interceptors:Source可以指定一个或者多个拦截器按先后顺序依次对采集到的数据进行处理;
  • Channel Selectors:Source发往多个Channel的策略设置,如果Source后面接了多个Channel,那么到底是给所有的Channel都发,还是根据规则发送到不同Channel,这些是由Channel Selectors来控制的;
  • Sink Processors:Sink 发送数据的策略设置,一个Channel后面可以接多个Sink,Channel中的数据是被哪个Sink获取,这个是由Sink Processors控制的。

       这篇博客主要研究Source Interceptors,并且在后文会实现一个对采集到的数据使用该组件进行分类存储的实例。在这之前要先了解一个概念:Event。Event是Flume传输数据的基本单位,也是事务的基本单位,在文本文件中,通常一行记录就是一个Event,其中包含header和body。body是采集到的那一行记录的原始内容;header类型为Map<String, String>,里面可以存储一些属性信息,方便后面使用。我们可以在Source中给每一条数据的header中增加key-value,在Channel和Sink中使用header中的值进行判别并实现分类。

2. 组件简介

       如核心组件一样,系统中已经内置提供了很多种的Source Interceptors,常见的类型有:Timestamp Interceptor、Host Interceptor、Search and Replace Interceptor 、Static Interceptor、Regex Extractor Interceptor 等。

  • Timestamp Interceptor:向Event中的header里面添加timestamp时间戳信息;
  • Host Interceptor:向Event中的header里面添加host属性,host的值为当前机器的主机名或者ip;
  • Search and Replace Interceptor:根据指定的规则查询Event中body里面的数据,然后进行替换,这个拦截器会修改Event中body的值,也就是会修改原始采集到的数据内容;
  • Static Interceptor:向Event中的header里面添加固定的key和value;
  • Regex Extractor Interceptor:根据指定的规则从Event中的body里面抽取数据,生成key和value,再把key和value添加到header中;

       注意:Search and Replace Interceptor是会根据规则修改event中body里面的原始数据内容,对header没有任何影响,使用这个拦截器需要特别小心,因为他会修改原始数据内容!

3. 项目实践

3.1 需求

       网站会实时产生这些日志数据,我们希望把这些数据采集到HDFS上进行存储,并且要按照数据类型(视频数据、用户数据和礼物数据)进行分目录存储。针对这个需求配置Agent的话,Source使用基于文件的Exec Source;为了保证数据的完整性和准确性,Channle使用基于文件的Channle;Sink使用HDFS Sink。但是HDFS Sink中的Path不能写死,需要动态获取日期,且不同类型的数据要存储到不同的目录中,那也就意味着Path路径中肯定要是有变量,除了日期变量还要有数据类型变量,实例数据如下:

video_info
"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"

user_info
"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"

gift_record
"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"

       这里的数据类型的格式都是单词中间有一个下划线,但是我们的要求是目录中的单词不要出现下划线,使用驼峰的命名格式。所以最终在hdfs中需要生成的目录大致是这样的:

hdfs://192.168.182.100:9000/moreType/20200101/videoInfo
hdfs://192.168.182.100:9000/moreType/20200101/userInfo
hdfs://192.168.182.100:9000/moreType/20200101/giftRecord

3.2 配置

       原始文件为Json数据格式,其中包含三种数据,因此使用三个Search and Replace Interceptor将这三种数据标注,并使用Regex Extractor Interceptor正则匹配这三种数据,抽取类别信息添加到header中。

# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置source组件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreType.log

# 配置拦截器 [多个拦截器按照顺序依次执行]
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info"
a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo"

a1.sources.r1.interceptors.i2.type = search_replace
a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info"
a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo"

a1.sources.r1.interceptors.i3.type = search_replace
a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record"
a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord"


a1.sources.r1.interceptors.i4.type = regex_extractor
a1.sources.r1.interceptors.i4.regex = "type":"(\\\\w+)"
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = logType


# 配置channel组件
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/moreType/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/moreType/data


# 配置sink组件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://(ip地址):9000/moreType/%Y%m%d/%logType
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log

# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.3 运行及结果

输入运行命令:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/moreType.conf -Dflume.root.logger=INFO,console

查看HDFS存储结果:

[root@bigData01 data]# hdfs dfs -ls /moreType/20230208
Found 3 items
drwxr-xr-x   - root supergroup          0 2023-02-08 13:17 /moreType/20230208/giftRecord
drwxr-xr-x   - root supergroup          0 2023-02-08 13:17 /moreType/20230208/userInfo
drwxr-xr-x   - root supergroup          0 2023-02-08 13:17 /moreType/20230208/videoInfo

以上是关于Flume高级组件之Source Interceptors及项目实践的主要内容,如果未能解决你的问题,请参考以下文章

打怪升级之小白的大数据之旅(七十三)<Flume高级>

打怪升级之小白的大数据之旅(七十三)<Flume高级>

打怪升级之小白的大数据之旅(七十三)<Flume高级>

打怪升级之小白的大数据之旅(七十三)<Flume高级>

Flume之核心架构深入解析

Flume之核心架构深入解析