Flumn

Posted mr-yl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flumn相关的知识,希望对你有一定的参考价值。

2.4.6.2.1 概论

数据发生器产生的数据被被单个的运行在数据发生器所在服务器上的agent所收集,之后数据收容器从各个agent上汇集数据并将采集到的数据存入到HDFS或者HBase

 

2.4.6.2.2 Flumn的一些核心概念
2.4.6.2.2.1 Event

一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。)

2.4.6.2.2.2 Agent

Flume 运行的核心是 Agent。Flumeagent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是 sourcechannelsink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

 

 

2.4.6.2.2.2.1 source

 从数据发生器接收数据并将接收的数据以Flumeevent格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式比如Avro,Thrift,twitter1%

2.4.6.2.2.2.2 Channel

channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在sourcesink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性。 并且它可以和任意数量的sourcesink链接。支持的类型有: JDBC channelFile System channelMemort channel等。

2.4.6.2.2.2.3 sink

sink将数据存储到集中存储器比如HbaseHDFS它从channals消费数据(events)并将其传递给目标地目标地可能是另一个sink,也可能HDFS、HBase。

2.4.6.2.2.2.4 source支持的数据接收方式、sink支持的写出方式、Channel支持的类型

参考官方API

urlhttp://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

目录如下:

 

2.4.6.2.3 Flumn的配置和启动
2.4.6.2.3.1 Flumn配置

(1) 配置文件命名需要以conf作为后缀名

(2) 监控文件写入HDFS配置如下:

#配置一个agentagent的名称可以自定义(如a1

#指定agentsources(如r1r2)、sinks(如k1k2)、channels(如c1c2

tier1.sources=r1 r2

tier1.sinks=k1 k2

tier1.channels=c1 c2

 

#描述source r1

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r1.type = spooldir

tier1.sources.r1.spoolDir =/var/flumefile

tier1.sources.r1.channels=c1

 

#配置sink

#输出方式为hdfs,并提供目录

tier1.sinks.k1.type=hdfs

tier1.sinks.k1.hdfs.path=hdfs://192.168.1.222:8020/pd

tier1.sinks.k1.channel=c1

 

 

#描述source r2

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r2.type = spooldir

tier1.sources.r2.spoolDir =/var/flumefile2

tier1.sources.r2.channels=c1

 

#配置sink

#输出方式为hdfs,并提供目录

tier1.sinks.k2.type=hdfs

tier1.sinks.k2.hdfs.path=hdfs://192.168.1.222:8020/flumefile

tier1.sinks.k2.channel=c1

 

#配置channels类型为 File

tier1.channels.c1.type=file

tier1.channels.c2.type=file

(3) 监控文件作为生产者提供数据到Kafak配置如下:

#配置一个agentagent的名称可以自定义(如a1

#指定agentsources(如r1r2)、sinks(如k1k2)、channels(如c1c2

tier1.sources=r1 r2

tier1.sinks=k1 k2

tier1.channels=c1 c2

 

#描述source r1

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r1.type = spooldir

tier1.sources.r1.spoolDir =/var/flumefile

tier1.sources.r1.channels=c1

 

 

#配置数据源输出

#设置Kafka接收器,此处最坑,注意版本,此处为Flume 1.6.0的输出槽类型

tier1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafkabroker地址和端口号

tier1.sinks.k1.brokerList=master:9092

#设置KafkaTopic

tier1.sinks.k1.topic=topic-test

#设置序列化方式

tier1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

#将三者级联

tier1.sinks.k1.channel=c1

 

 

#配置channels类型为 File

tier1.channels.c1.type=memory

tier1.channels.c1.capacity=10000

tier1.channels.c1.transactionCapacity=100

 

 

#描述source r1

#配置目录scource

#配置监控的目录,当目录出现新文件时会进行写入

tier1.sources.r2.type = spooldir

tier1.sources.r2.spoolDir =/var/flumefile2

tier1.sources.r2.channels=c2

 

 

#配置数据源输出

#设置Kafka接收器,此处最坑,注意版本,此处为Flume 1.6.0的输出槽类型

tier1.sinks.k2.type= org.apache.flume.sink.kafka.KafkaSink

#设置Kafkabroker地址和端口号

tier1.sinks.k2.brokerList=master:9092

#设置KafkaTopic

tier1.sinks.k2.topic=test

#设置序列化方式

tier1.sinks.k2.serializer.class=kafka.serializer.StringEncoder

#将三者级联

tier1.sinks.k2.channel=c2

 

 

#配置channels类型为 File

tier1.channels.c2.type=memory

tier1.channels.c2.capacity=10000

tier1.channels.c2.transactionCapacity=100

 

2.4.6.2.3.1 Flumn启动

(1) 将上述配置文件放在Flumn安装路径的根目录下conf.empty文件夹中,目前192.168.1.222服务器下的安装地址为/opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/etc/flume-ng/

(2) 启动命令

登陆之后在根目录执行启动,并且登陆角色拥有安装目录文件的操作权限。

#表示启动agent

flume-ng agent

 

#启动的agent的名字为tier1 对应配置文件中配置的agent名字

--name tier1

 

#配置文件所在的路径

--conf /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/etc/flume-ng/conf.empty

 

#配置文件所在的全路径

--conf-file /opt/cloudera/parcels/CDH-5.14.2-1.cdh5.14.2.p0.3/etc/flume-ng/conf.empty/flume-config.conf

 

#打印执行信息到控制台上

-Dflume.root.logger=INFO,console

以上是关于Flumn的主要内容,如果未能解决你的问题,请参考以下文章