Flume快速入门
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume快速入门相关的知识,希望对你有一定的参考价值。
参考技术AFlume是开源日志系统。是一个分布式、可靠性和高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,FLume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力。
Flume是流式日志采集工具,FLume提供对数据进行简单处理并且写到各种数据接收方(可定制)的能力,Flume提供从本地文件(spooling directory source)、实时日志(taildir、exec)、REST消息、Thift、Avro、Syslog、Kafka等数据源上收集数据的能力。
Flume是收集、聚合事件流数据的分布式框架。
Flume分布式系统中最核心的角色是 agent ,Flume采集系统就是由一个个agent所连接起来形成
每一个agent相当于一个数据传递员 ,内部有三个组件:
Source 到Channel 到Sink之间传递数据的形式是Event事件; Event事件是一个数据流单元 。
Flume基础架构:Flume可以单节点直接采集数据,主要应用于集群内数据。
Flume多agent架构:Flume可以将多个节点连接起来,将最初的数据源经过收集,存储到最终的存储系统中。主要应用于集群外的数据导入到集群内。
各组件具体介绍如下:
Source负责接收events或通过特殊机制产生events,并将events批量放到一个或多个Channels。有驱动和轮询2中类型的Source。
Source必须至少和一个channel关联。
Source的类型如下:
Channel位于Source和Sink之间,Channel的作用类似队列,用于临时缓存进来的events,当Sink成功地将events发送到下一跳的channel或最终目的,events从Channel移除。
不同的Channel提供的持久化水平也是不一样的:
Channels支持事物,提供较弱的顺序保证,可以连接任何数量的Source和Sink。
Sink负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。
必须作用于一个确切的channel。
Sink类型:
Flume支持将集群外的日志文件采集并归档到HDFS、HBase、Kafka上,供上层应用对数据分析、清洗数据使用。
Flume支持将多个Flume级联起来,同时级联节点内部支持数据复制。
这个场景主要应用于:收集FusionInsight集群外上的节点上的日志,并通过多个Flume节点,最终汇聚到集群当中。
Flume级联节点之间的数据传输支持压缩和加密,提升数据传输效率和安全性。
在同一个Flume内部进行传输时,不需要加密,为进程内部的数据交换。
Source接收的数据量,Channel缓存的数据量,Sink写入的数据量,这些都可以通过Manager图形化界面呈现出来。
Flume在传输数据过程中,采用事物管理方式,保证数据传输过程中数据不会丢失,增强了数据传输的可靠性,同时缓存在channel中的数据如果采用了file channel,进程或者节点重启数据不会丢失。
Flume在传输数据过程中,如果下一跳的Flume节点故障或者数据接收异常时,可以自动切换到另外一路上继续传输。
Flume在传输数据过程中,可以见到的对数据简单过滤、清洗,可以去掉不关心的数据,同时如果需要对复杂的数据过滤,需要用户根据自己的数据特殊性,开发过滤插件,Flume支持第三方过滤插件调用
flume快速入门及常用案例整理
flume快速入门及常用案例整理
flume概述
1.1flume定义
flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume基于流式架构,灵活简单
Flume最主要的作用就是,实时读取服务器本次磁盘的数据,讲数据写入到HDFS。
1.2flume基础架构
Agent
Agent是一个JVM进程,它是以事件的形式讲数据从源头送至目的。
Agent主要有三个组成部分:Source,Channel,Sink
Source
Source是负责接受数据到flume agent的组件。source组件可以处理各种类型、各种格式的日志数据,包括avro,thrift,exec,jms,spooling directory,netcat,sequence generator,syslog,http,legacy
Sink
Sink不断的轮询Channel中的事件并且批量的移除他们,并将这些事件批量写入储存或索引系统、或者被发送到另一个flume agent
sink组件目的地包括hdfs,logger,avro,thrift,ipc,file,Hbase,solr,自定义
Channel
Channel是位于Source和sink之间的缓冲区,因此channel允许source和sink运作在不同的速率上。channel是线程安全的,可以同时处理几个source的写入操作和几个sink的读取操作。
flume自带两种channel:Memory Channel 和 File Channel 以及Kafka Channel
Memory Channel 是内存中的队列,Memory Channel 在不需要关系数据丢失的情况下使用。Memory Channerl在程序死亡、机器或者重启都会导致数据的丢失
File Channel 将所有的事件写到磁盘,因此在程序关闭或者机器宕机的情况下不会丢失数据
Event
传输单元,flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放event的一些属性,为K-V结构,Body 用来存放改条数据,形式为字节数组。
flume快速安装部署
将apache-flume-1.9.0解压并重命名为flume190
将 flume/conf 下的 flume env.sh.template 文件修改为 flume env.sh ,并 配置 flume
env.sh 文件
vim flume env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
添加环境变量
vim /etc/profile.d/myenv.sh
#flume
export FLUME_HOME=/opt/software/flume190
export PATH=$FLUME_HOME/bin:$PATH
入门案例(端口监听)
1、首先需要安装netcat工具,用于监听端口
yum -install -y nc
2、检测端口是否被占用
nc 192.168.71.200 6666
3、创建.conf文件,并添加如下内容
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =netcat
a1.sources.s1.bind = 192.168.71.200
a1.sources.s1.port = 6666
#初始化通道
a1.channels.c1.type = memory
a1.channels.capacity = 100
a1.channels.transactionCapacity = 10
#初始化数据槽
a1.sinks.k1.type = logger
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
配置文件解析如下
#组件声明 a1表示agent的名称
a1.sources = s1 s1:表示a1的source名称
a1.channels= c1 c1:表示a1的channel名称
a1.sinks = k1 k1:表示a1的sink名称
# 初始化数据源
a1.sources.s1.type =netcat a1的输入源类型为netcat端口类型
a1.sources.s1.bind = 192.168.71.200 表示a1的监听主机
a1.sources.s1.port = 6666 表示a1的监听的端口号
#初始化通道
a1.channels.c1.type = memory 表示a1的channel类型是memory内存型
a1.channels.capacity = 100 表示a1的channel总容量100个event
a1.channels.transactionCapacity = 10 表示a1的channel传输时收集到了100条event以后再去提交事务
#初始化数据槽
a1.sinks.k1.type = logger 表示a1的储出目的地的控制台logger类型
#关联组件
a1.sources.s1.channels = c1 表示将a1和s1连接起来
a1.sinks.k1.channel = c1 表示将a1和k1连接起来
开启命令
flume-ng agent --conf conf/ --name a1 --conf-file flume01.conf -Dflume.root.logger=INFO,console
## 第二种写法
flume-ng agent -n a1 -c conf/ -f /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console
我这里使用了第二种方式,执行命令后,去另外一个窗口进行侦听,我的配置文件里写的端口是6666
nc 192.168.71.200 6666
在这里输入任何内容,另外一个窗口都会侦听到
案例一:监听日志Spooling Directory Source
具体需求:实时监控整个日志目录下的所有文件,并上传到HDFS 中
采集静态⽬录下,新增⽂本⽂件,采集完成后会修改⽂件后缀,但是不会删除采集的源⽂件,如果⽤户只想采集⼀次,可以修改该source默认⾏为
Spooling Directory Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。 在新文件被完全读入Channel之后默认会重命名该文件以示完成(也可以配置成读完后立即删除、也可以配置trackerDir来跟踪已经收集过的文件)。
提示:使用trackerDir跟踪收集的文件是通过1.9新增加了一个参数trackingPolicy,跟原有的参数组合后新增了一个使用的场景:标记已经被收集完成的文件,但是又不想对原文件做任何改动。
与 Exec Source 不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的被收集的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:
- 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
- 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =spooldir
# 监听的目标文件夹
a1.sources.s1.spoolDir = /opt/practice/flume
# 监听时忽略的文件类型(支持正则)
a1.sources.s1.ignorePattern = ^(.)*\\\\.bak$
# 监听结束后的文件后缀
a1.sources.s1.fileSuffix = .bak
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/chechpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/fakeorder/%Y-%m-%d/%H
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 10
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute
# 上传的文件前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 设置滚动与event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 0
# 设置积攒多少个event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
案例二:实时监控目录下的多个追加文件taildir
需求:监控多个目录下的不同文档,上传到HDFS中,
实时监测动态⽂本⾏的追加,并且记录采集的⽂件读取的位置了偏移量,即使下⼀次再次采集,可以实现增量采集。
Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。
Taildir Source是可靠的,即使发生文件轮换也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。
Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。
文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。
Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件。
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type = taildir
# 多个f1 f2用空格分开,并且分别写其具体路径
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail01/prolog.*\\\\.log
a1.sources.s1.filegroups.f2 = /opt/practice/flume/tail02/prolog.*\\\\.log
a1.sources.s1.positionFile = /opt/software/flume190/mydata/data/taildir/taildir_position.json
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
案例三:avro
监听Avro端口来接受外部avro客户端的事件流,和netcat不同的是,avro-source接收到的是
经过avro序列化之后的数据,然后反序列化数据继续传输,所以,如果avro-source的话,源
数据必须是经过avro序列化之后的数据。而netcat接收的是字符串格式的数据。
简单来说,就是服务器端侦听客户端的文件,然后上传。
服务器端的配置文件如下
test03_avro_flie_hdfs.conf
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1
# 初始化数据源
a1.sources.s1.type =avro
a1.sources.s1.bind=192.168.71.200
a1.sources.s1.port=7777
a1.sources.s1.threads=5
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/avror/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H%M
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
执行这个文件:
flume-ng agent -n a1 -c conf/ -f /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console
出现上图的情况时,说明正在处于侦听状态,然后去另外一个窗口启动客户端服务,我这里提前准备了一个文件prolog.log
flume-ng avro-client -H 192.168.71.200 -p 7777 -c /conf -F /opt/practice/flume/prolog.log
当再次回到服务器端时,就会发生变化,如下图,就是正在上传阶段
案例四:hive sink
事前准备
一、hive建表要求
1、hive的表必须是事务表
2、表必须是分区表,分桶表
3、表stored as orc
二、flume中配置要求
Flume配置的Hive 列名必须都为小写字母,否则会报错。
三、需要为flume准备hive-hcatalog-streaming的jar包,拷贝到flume的lib下
cp /opt/software/hive312/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar ./
详细案例
一、设置hive表
SET hive.support.concurrency = true;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on = true;
SET hive.compactor.worker.threads = 1;
二、创建hive表
create table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited
fields terminated by ','
lines terminated by '\\n'
stored as orc
tblproperties('transactional'='true');
三、添加分区,这里是根据日期进行分区
alter table familyinfo add partition(intime='21-07-05-30');
四、写配置文件
这里已经提前在/opt/practice/flume/tail03/ 目录下准备好了文件。
#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/opt/practice/flume/tail03/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.c1.transactionCapacity=10
#hive sink
a1.sinks.k1.type=hive
a1.sinks.k1.hive.metastore=thrift://192.168.71.200:9083
a1.sinks.k1.hive.database=test
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
a1.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10
a1.sinks.k1.roundValue=10
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.serializer.delimiter=","
a1.sinks.k1.serializer.serdeSeparator=','
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender
#关联
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
执行配置文件,就可以在hive里查询到表了。
一定要注意提前拷贝jar包
HBase sink 采用RegexHBase2EventSerializer序列化
需要提前在hbase中建好namespace和table
#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail04/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10
#file channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs =/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10
#hbase sink
a1.sinks.k1.type = hbase2
#指定namespace 和 table
a1.sinks.k1.table=test:stuflumehbasesink
# 指定columnFamily
a1.sinks.k1.columnFamily = base
# 正则提取
a1.sinks.k1.serializer.regex=(.*),(.*),(.*),(.*)
a1.sinks.k1.serializer=org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
#确定colnames
a1.sinks.k1.serializer.colNames=ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize=10
#connect to channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
以上是关于Flume快速入门的主要内容,如果未能解决你的问题,请参考以下文章