flume快速入门及常用案例整理

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume快速入门及常用案例整理相关的知识,希望对你有一定的参考价值。

flume快速入门及常用案例整理

flume概述

1.1flume定义

flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,flume基于流式架构,灵活简单

Flume最主要的作用就是,实时读取服务器本次磁盘的数据,讲数据写入到HDFS。

1.2flume基础架构

Agent

​ Agent是一个JVM进程,它是以事件的形式讲数据从源头送至目的。

​ Agent主要有三个组成部分:Source,Channel,Sink

Source

Source是负责接受数据到flume agent的组件。source组件可以处理各种类型、各种格式的日志数据,包括avro,thrift,exec,jms,spooling directory,netcat,sequence generator,syslog,http,legacy

Sink

Sink不断的轮询Channel中的事件并且批量的移除他们,并将这些事件批量写入储存或索引系统、或者被发送到另一个flume agent

sink组件目的地包括hdfs,logger,avro,thrift,ipc,file,Hbase,solr,自定义

Channel

Channel是位于Source和sink之间的缓冲区,因此channel允许source和sink运作在不同的速率上。channel是线程安全的,可以同时处理几个source的写入操作和几个sink的读取操作。

flume自带两种channel:Memory Channel 和 File Channel 以及Kafka Channel

Memory Channel 是内存中的队列,Memory Channel 在不需要关系数据丢失的情况下使用。Memory Channerl在程序死亡、机器或者重启都会导致数据的丢失

File Channel 将所有的事件写到磁盘,因此在程序关闭或者机器宕机的情况下不会丢失数据

Event

传输单元,flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放event的一些属性,为K-V结构,Body 用来存放改条数据,形式为字节数组。

flume快速安装部署

将apache-flume-1.9.0解压并重命名为flume190

将 flume/conf 下的 flume env.sh.template 文件修改为 flume env.sh ,并 配置 flume
env.sh 文件

vim flume env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

添加环境变量

vim /etc/profile.d/myenv.sh

#flume
export FLUME_HOME=/opt/software/flume190
export PATH=$FLUME_HOME/bin:$PATH

入门案例(端口监听)

1、首先需要安装netcat工具,用于监听端口

yum -install -y nc

2、检测端口是否被占用

nc 192.168.71.200 6666

3、创建.conf文件,并添加如下内容

#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type =netcat
a1.sources.s1.bind = 192.168.71.200
a1.sources.s1.port = 6666
#初始化通道
a1.channels.c1.type = memory
a1.channels.capacity = 100
a1.channels.transactionCapacity = 10
#初始化数据槽
a1.sinks.k1.type = logger
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

配置文件解析如下

#组件声明     										a1表示agent的名称
a1.sources = s1									s1:表示a1的source名称
a1.channels= c1									c1:表示a1的channel名称
a1.sinks = k1									k1:表示a1的sink名称

# 初始化数据源
a1.sources.s1.type =netcat						a1的输入源类型为netcat端口类型
a1.sources.s1.bind = 192.168.71.200				表示a1的监听主机
a1.sources.s1.port = 6666						表示a1的监听的端口号
#初始化通道
a1.channels.c1.type = memory					表示a1的channel类型是memory内存型
a1.channels.capacity = 100						表示a1的channel总容量100个event
a1.channels.transactionCapacity = 10			表示a1的channel传输时收集到了100条event以后再去提交事务
#初始化数据槽
a1.sinks.k1.type = logger						表示a1的储出目的地的控制台logger类型
#关联组件
a1.sources.s1.channels = c1						表示将a1和s1连接起来
a1.sinks.k1.channel = c1						表示将a1和k1连接起来

开启命令

flume-ng agent --conf conf/ --name a1  --conf-file flume01.conf -Dflume.root.logger=INFO,console

## 第二种写法

flume-ng agent -n a1 -c conf/ -f  /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console

我这里使用了第二种方式,执行命令后,去另外一个窗口进行侦听,我的配置文件里写的端口是6666

nc 192.168.71.200 6666

在这里输入任何内容,另外一个窗口都会侦听到

案例一:监听日志Spooling Directory Source

具体需求:实时监控整个日志目录下的所有文件,并上传到HDFS 中

采集静态⽬录下,新增⽂本⽂件,采集完成后会修改⽂件后缀,但是不会删除采集的源⽂件,如果⽤户只想采集⼀次,可以修改该source默认⾏为

Spooling Directory Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。 在新文件被完全读入Channel之后默认会重命名该文件以示完成(也可以配置成读完后立即删除、也可以配置trackerDir来跟踪已经收集过的文件)。

提示:使用trackerDir跟踪收集的文件是通过1.9新增加了一个参数trackingPolicy,跟原有的参数组合后新增了一个使用的场景:标记已经被收集完成的文件,但是又不想对原文件做任何改动。

Exec Source 不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的被收集的文件必须是不可变的、唯一命名的。Flume会自动检测避免这种情况发生,如果发现问题,则会抛出异常:

  1. 如果文件在写入完成后又被再次写入新内容,Flume将向其日志文件(这是指Flume自己logs目录下的日志文件)打印错误并停止处理。
  2. 如果在以后重新使用以前的文件名,Flume将向其日志文件打印错误并停止处理。
#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type =spooldir
# 监听的目标文件夹
a1.sources.s1.spoolDir = /opt/practice/flume
# 监听时忽略的文件类型(支持正则)
a1.sources.s1.ignorePattern = ^(.)*\\\\.bak$
# 监听结束后的文件后缀
a1.sources.s1.fileSuffix = .bak
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/chechpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/fakeorder/%Y-%m-%d/%H
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 10
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute
# 上传的文件前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H
a1.sinks.k1.hdfs.fileSuffix = .log
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 设置滚动与event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 0
# 设置积攒多少个event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

案例二:实时监控目录下的多个追加文件taildir

​ 需求:监控多个目录下的不同文档,上传到HDFS中,

实时监测动态⽂本⾏的追加,并且记录采集的⽂件读取的位置了偏移量,即使下⼀次再次采集,可以实现增量采集。

Taildir Source监控指定的一些文件,并在检测到新的一行数据产生的时候几乎实时地读取它们,如果新的一行数据还没写完,Taildir Source会等到这行写完后再读取。

Taildir Source是可靠的,即使发生文件轮换也不会丢失数据。它会定期地以JSON格式在一个专门用于定位的文件上记录每个文件的最后读取位置。如果Flume由于某种原因停止或挂掉,它可以从文件的标记位置重新开始读取。

Taildir Source还可以从任意指定的位置开始读取文件。默认情况下,它将从每个文件的第一行开始读取。

文件按照修改时间的顺序来读取。修改时间最早的文件将最先被读取(简单记成:先来先走)。

Taildir Source不重命名、删除或修改它监控的文件。当前不支持读取二进制文件。只能逐行读取文本文件

#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type = taildir
# 多个f1 f2用空格分开,并且分别写其具体路径
a1.sources.s1.filegroups = f1 f2
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail01/prolog.*\\\\.log
a1.sources.s1.filegroups.f2 = /opt/practice/flume/tail02/prolog.*\\\\.log
a1.sources.s1.positionFile = /opt/software/flume190/mydata/data/taildir/taildir_position.json

#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/tailevent/%Y-%m-%d/%H
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.threadsPoolSize = 4
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

案例三:avro

监听Avro端口来接受外部avro客户端的事件流,和netcat不同的是,avro-source接收到的是
经过avro序列化之后的数据,然后反序列化数据继续传输,所以,如果avro-source的话,源
数据必须是经过avro序列化之后的数据。而netcat接收的是字符串格式的数据。

简单来说,就是服务器端侦听客户端的文件,然后上传。

服务器端的配置文件如下

test03_avro_flie_hdfs.conf

#组件声明
a1.sources = s1
a1.channels= c1
a1.sinks = k1

# 初始化数据源
a1.sources.s1.type =avro
a1.sources.s1.bind=192.168.71.200
a1.sources.s1.port=7777
a1.sources.s1.threads=5
#初始化通道
a1.channels.c1.type = file
a1.channels.c1.checkpointDir =/opt/software/flume190/mydata/checkpoint
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
#初始化数据槽
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.71.200:9820/flume/events/avror/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H%M
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
#关联组件
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

执行这个文件:

flume-ng agent -n a1 -c conf/ -f  /opt/flume_job/flume01.conf -Dflume.root.logger=INFO,console

出现上图的情况时,说明正在处于侦听状态,然后去另外一个窗口启动客户端服务,我这里提前准备了一个文件prolog.log

flume-ng avro-client -H 192.168.71.200 -p 7777 -c /conf -F /opt/practice/flume/prolog.log 

当再次回到服务器端时,就会发生变化,如下图,就是正在上传阶段

案例四:hive sink

事前准备

一、hive建表要求

1、hive的表必须是事务表

2、表必须是分区表,分桶表

3、表stored as orc

二、flume中配置要求

Flume配置的Hive 列名必须都为小写字母,否则会报错。

三、需要为flume准备hive-hcatalog-streaming的jar包,拷贝到flume的lib下

cp /opt/software/hive312/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar ./

详细案例

一、设置hive表

SET hive.support.concurrency = true;

SET hive.enforce.bucketing = true;

SET hive.exec.dynamic.partition.mode = nonstrict;

SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

SET hive.compactor.initiator.on = true;

SET hive.compactor.worker.threads = 1;

二、创建hive表

create  table familyinfo(
family_id int,
family_name string,
family_age int,
family_gender string
)
partitioned by(intime string)
clustered by(family_gender) into 2 buckets
row format delimited
fields terminated by ','
lines terminated by '\\n'
stored as orc
tblproperties('transactional'='true');

三、添加分区,这里是根据日期进行分区

alter table familyinfo add partition(intime='21-07-05-30');

四、写配置文件

这里已经提前在/opt/practice/flume/tail03/ 目录下准备好了文件。

#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups=f1
a1.sources.s1.filegroups.f1=/opt/practice/flume/tail03/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10

#file channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs=/opt/software/flume190/mydata/data
a1.channels.c1.capacity=100
a1.channels.c1.transactionCapacity=10

#hive sink
a1.sinks.k1.type=hive
a1.sinks.k1.hive.metastore=thrift://192.168.71.200:9083
a1.sinks.k1.hive.database=test
a1.sinks.k1.hive.table=familyinfo
a1.sinks.k1.hive.partition=%y-%m-%d-%H
a1.sinks.k1.useLocalTimeStamp=true
a1.sinks.k1.autoCreatePartitions=false
a1.sinks.k1.round=true
a1.sinks.k1.batchSize=10 
a1.sinks.k1.roundValue=10 
a1.sinks.k1.roundUnit=minute
a1.sinks.k1.serializer=DELIMITED 
a1.sinks.k1.serializer.delimiter=","
a1.sinks.k1.serializer.serdeSeparator=',' 
a1.sinks.k1.serializer.fieldnames=family_id,family_name,family_age,family_gender

#关联
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1

执行配置文件,就可以在hive里查询到表了。

一定要注意提前拷贝jar包

HBase sink 采用RegexHBase2EventSerializer序列化

需要提前在hbase中建好namespace和table

#initialize
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#taildir source
a1.sources.s1.type=taildir
a1.sources.s1.filegroups = f1
a1.sources.s1.filegroups.f1 = /opt/practice/flume/tail04/.*.log
a1.sources.s1.positionFile=/opt/software/flume190/mydata/data/taildir/taildir_position.json
a1.sources.s1.batchSize=10

#file channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/opt/software/flume190/mydata/checkpoint02
a1.channels.c1.dataDirs =/opt/software/flume190/mydata/data
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 10

#hbase sink
a1.sinks.k1.type = hbase2
#指定namespace 和 table
a1.sinks.k1.table=test:stuflumehbasesink
# 指定columnFamily
a1.sinks.k1.columnFamily = base
# 正则提取
a1.sinks.k1.serializer.regex=(.*),(.*),(.*),(.*)
a1.sinks.k1.serializer=org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
#确定colnames
a1.sinks.k1.serializer.colNames=ROW_KEY,name,age,gender
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize=10


#connect to channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

以上是关于flume快速入门及常用案例整理的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之FlumeFlume概述Flume快速入门

大数据技术之FlumeFlume概述Flume快速入门

大数据技术之FlumeFlume概述Flume快速入门

Flume

Flume

Flume