flume快速入门及常用案例整理
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume快速入门及常用案例整理相关的知识,希望对你有一定的参考价值。
flume快速入门及常用案例整理
flume概述
1.1flume定义
flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume基于流式架构,灵活简单
Flume最主要的作用就是,实时读取服务器本次磁盘的数据,讲数据写入到HDFS。
1.2flume基础架构
Agent
Agent是一个JVM进程,它是以事件的形式讲数据从源头送至目的。
Agent主要有三个组成部分:Source,Channel,Sink
Source
Source是负责接受数据到flume agent的组件。source组件可以处理各种类型、各种格式的日志数据,包括avro,thrift,exec,jms,spooling directory,netcat,sequence generator,syslog,http,legacy
Sink
Sink不断的轮询Channel中的事件并且批量的移除他们,并将这些事件批量写入储存或索引系统、或者被发送到另一个flume agent
sink组件目的地包括hdfs,logger,avro,thrift,ipc,file,Hbase,solr,自定义
Channel
Channel是位于Source和sink之间的缓冲区,因此channel允许source和sink运作在不同的速率上。channel是线程安全的,可以同时处理几个source的写入操作和几个sink的读取操作。
flume自带两种channel:Memory Channel 和 File Channel 以及Kafka Channel
Memory Channel 是内存中的队列,Memory Channel 在不需要关系数据丢失的情况下使用。Memory Channerl在程序死亡、机器或者重启都会导致数据的丢失
File Channel 将所有的事件写到磁盘,因此在程序关闭或者机器宕机的情况下不会丢失数据
Event
传输单元,flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放event的一些属性,为K-V结构,Body 用来存放改条数据,形式为字节数组。
flume快速安装部署
将apache-flume-1.9.0解压并重命名为flume190
将 flume/conf 下的 flume env.sh.template 文件修改为 flume env.sh ,并 配置 flume
env.sh 文件
vim flume env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
添加环境变量
vim /etc/profile.d/myenv.sh
#flume
export FLUME_HOME=/opt/software/flume190
export PATH=$FLUME_HOME/bin:$PATH
入门案例(端口监听)
1、首先需要安装netcat工具,用于监听端口
yum -install -y nc
2、检测端口是否被占用
nc 192.168.71.200 6666
3、创建.conf文件,并添加如下内容
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =netcat
a1.sources.s1.bind = 192.168.71.200
a1.sources.s1.port = 6666
#初始化通道
a1.channels.c1.type = memory
a1.channels.capacity = 100
a1.channels.transactionCapacity = 10
#初始化数据槽
a1.sinks.k1.type = logger
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
配置文件解析如下
#组件声明 a1表示agent的名称
a1.sources = s1 s1:表示a1的source名称
a1.channels= c1 c1:表示a1的channel名称
a1.sinks = k1 k1:表示a1的sink名称
# 初始化数据源
a1.sources.s1.type =netcat a1的输入源类型为netcat端口类型
a1.sources.s1.bind = 192.168.71.200 表示a1的监听主机
a1.sources.s1.port = 6666 表示a1的监听的端口号
#初始化通道
a1.channels.c1.type = memory 表示a1的channel类型是memory内存型
a1.channels.capacity = 100 表示a1的channel总容量100个event
a1.channels.transactionCapacity = 10 表示a1的channel传输时收集到了100条event以后再去提交事务
#初始化数据槽
a1.sinks.k1.type = logger 表示a1的储出目的地的控制台logger类型
#关联组件
a1.sources.s1.channels = c1 表示将a1和s1连接起来
a1.sinks.k1.channel = c1 表示将a1和k1连接起来
开启命令
flume-ng agent --conf conf/ --name a1 --conf-file flume01.conf -Dflume.root.logger=INFO,console
## 第二种写法
flume-ng agent -n a1 -c conf/ -f /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console
我这里使用了第二种方式,执行命令后,去另外一个窗口进行侦听,我的配置文件里写的端口是6666
nc 192.168.71.200 6666
在这里输入任何内容,另外一个窗口都会侦听到
案例一:监听日志Spooling Directory Source
具体需求:实时监控整个日志目录下的所有文件,并上传到HDFS 中
采集静态⽬录下,新增⽂本⽂件,采集完成后会修改⽂件后缀,但是不会删除采集的源⽂件,如果⽤户只想采集⼀次,可以修改该source默认⾏为
Spooling Directory Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。 在新文件被完全读入Channel之后默认会重命名该文件以示完成(也可以配置成读完后立即删除、也可以配置trackerDir来跟踪已经收集过的文件)。
提示:使用trackerDir跟踪收集的文件是通过1.9新增加了一个参数trackingPolicy,跟原有的参数组合后新增了一个使用的场景:标记已经被收集完成的文件,但是又不想对原文件做任何改动。
与 Exec Source 不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的被收集的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:
- 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
- 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =spooldir
# 监听的目标文件夹
a1.sources.s1.spoolDir = /opt/practice/flume
# 监听时忽略的文件类型(支持正则)
a1.sources.s1.ignorePattern = ^(.)*\\\\.bak$
# 监听结束后的文件后缀
a1.sources.s1.fileSuffix = .bak
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/chechpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/fakeorder/%Y-%m-%d/%H
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 10
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute
# 上传的文件前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 设置滚动与event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 0
# 设置积攒多少个event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
案例二:实时监控目录下的多个追加文件taildir
需求:监控多个目录下的不同文档,上传到HDFS中,
实时监测动态⽂本⾏的追加,并且记录采集的⽂件读取的位置了偏移量,即使下⼀次再次采集,可以实现增量采集。
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
Taildir Source是可靠的,即使发生文件轮换也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。
Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。
文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。
Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type = taildir
# 多个f1 f2用空格分开,并且分别写其具体路径
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail01/prolog.*\\\\.log
a1.sources.s1.filegroups.f2 = /opt/practice/flume/tail02/prolog.*\\\\.log
a1.sources.s1.positionFile = /opt/software/flume190/mydata/data/taildir/taildir_position.json
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
案例三:avro
监听Avro端口来接受外部avro客户端的事件流,和netcat不同的是,avro-source接收到的是
经过avro序列化之后的数据,然后反序列化数据继续传输,所以,如果avro-source的话,源
数据必须是经过avro序列化之后的数据。而netcat接收的是字符串格式的数据。
简单来说,就是服务器端侦听客户端的文件,然后上传。
服务器端的配置文件如下
test03_avro_flie_hdfs.conf
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =avro
a1.sources.s1.bind=192.168.71.200
a1.sources.s1.port=7777
a1.sources.s1.threads=5
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/avror/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H%M
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
执行这个文件:
flume-ng agent -n a1 -c conf/ -f /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console
出现上图的情况时,说明正在处于侦听状态,然后去另外一个窗口启动客户端服务,我这里提前准备了一个文件prolog.log
flume-ng avro-client -H 192.168.71.200 -p 7777 -c /conf -F /opt/practice/flume/prolog.log
当再次回到服务器端时,就会发生变化,如下图,就是正在上传阶段
案例四:hive sink
事前准备
一、hive建表要求
1、hive的表必须是事务表
2、表必须是分区表,分桶表
3、表stored as orc
二、flume中配置要求
Flume配置的Hive 列名必须都为小写字母,否则会报错。
三、需要为flume准备hive-hcatalog-streaming的jar包,拷贝到flume的lib下
cp /opt/software/hive312/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar ./
详细案例
一、设置hive表
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;
二、创建hive表
create table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited
fields terminated by ','
lines terminated by '\\n'
stored as orc
tblproperties('transactional'='true');
三、添加分区,这里是根据日期进行分区
alter table familyinfo add partition(intime='21-07-05-30');
四、写配置文件
这里已经提前在/opt/practice/flume/tail03/ 目录下准备好了文件。
#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/opt/practice/flume/tail03/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.c1.transactionCapacity=10
#hive sink
a1.sinks.k1.type=hive
a1.sinks.k1.hive.metastore=thrift://192.168.71.200:9083
a1.sinks.k1.hive.database=test
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
a1.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10
a1.sinks.k1.roundValue=10
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.serializer.delimiter=","
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender
#关联
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
执行配置文件,就可以在hive里查询到表了。
一定要注意提前拷贝jar包
HBase sink 采用RegexHBase2EventSerializer序列化
需要提前在hbase中建好namespace和table
#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail04/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs =/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
#hbase sink
a1.sinks.k1.type = hbase2
#指定namespace 和 table
a1.sinks.k1.table=test:stuflumehbasesink
# 指定columnFamily
a1.sinks.k1.columnFamily = base
# 正则提取
a1.sinks.k1.serializer.regex=(.*),(.*),(.*),(.*)
a1.sinks.k1.serializer=org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
#确定colnames
a1.sinks.k1.serializer.colNames=ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize=10
#connect to channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
以上是关于flume快速入门及常用案例整理的主要内容,如果未能解决你的问题,请参考以下文章