flume与kafka集成配置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume与kafka集成配置相关的知识,希望对你有一定的参考价值。

参考技术A 简介

Flume代理配置存储在本地配置文件中。这是遵循Javaproperties文件格式的文本文件。可以在同一配置文件中指定一个或多个代理的配置。配置文件包括代理中每个source,sink和channel的属性,以及它们如何连接在一起以形成数据流。

流中的每个组件(source,sink和channel)都有一个名称,类型和特定于该类型和实例化的属性集。例如,一个Avro源需要一个主机名(或IP地址)和一个端口号来接收数据。内存通道可以具有最大队列大小(“capacity”),并且HDFS的sink需要知道文件系统URI,创建文件的路径,文件rotation的frequency(“hdfs.rollInterval”)等。组件的所有此类属性需要在hosting Flume代理的属性文件中进行设置。

代理需要知道要加载哪些单个组件以及如何连接它们才能构成流程。通过列出代理中每个source,sink和channel的名称,然后为每个sink和source指定channel来完成此操作。例如,代理通过称为文件通道的文件通道将事件从名为avroWeb的Avro源流到HDFS接收器hdfs-cluster1。配置文件将包含这些组件的名称和文件通道,作为avroWebsource和hdfs-cluster1sink的共享通道。

使用称为flume-ng的shell脚本启动代理,该脚本位于Flume发行版的bin目录中。您需要在命令行上指定代理名称,配置目录和配置文件:

$ bin/flume-ng agent -n $agent_name -c conf-f conf/flume-conf.properties.template

然后,代理将开始运行在给定属性文件中配置的source,sink和channel。

示例

在这里,我们提供了一个示例配置文件,描述了单节点Flume部署。通过此配置,用户可以生成事件,然后将其记录到控制台。

#example.conf:单节点Flume配置

#在此代理上命名组件

a1.sources   =  r1

a1.sinks   =  k1

a1.channels   =  c1

#描述/配置源

a1.sources.r1.type   =  netcat

a1.sources.r1.bind   =  localhost

a1.sources.r1.port   =  44444

#描述接收器

a1.sinks.k1.type   =  logger

#使用通道将事件缓存在内存

a1.channels.c1.type   =  memory

a1.channels中.c1.capacity   =  1000

a1.channels.c1.transactionCapacity   = 100

#将源和接收器绑定到通道

a1.sources.r1.channels   =  c1

a1.sinks.k1.channel   =  c1

此配置定义了一个名为a1的代理。a1具有侦听端口44444上的数据的source,在内存中缓冲事件数据的通道以及将事件数据记录到控制台的sink。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。

有了这个配置文件,我们可以如下启动Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

请注意,在完整部署中,我们通常会包含一个选项:-- conf

在.bash_profile中加入flume环境变量

PATH=/usr/flume/bin:$PATH:$HOME/bin

source .bash_profile刷新

使用shell将大量文件分发到不同sources中

定时任务* * * * * sh cp.sh

cp.sh

#!/bin/bash

source ~/.bash_profile

time=`date +%Y%m%d%H%M -d -2min`

echo `date '+%Y-%m-%d %H:%M:%S'`":$time cp start"

for file in `ls *xxxx*`

do

 file_name=`basename $file`

 cp $file /data/1/$file_name.tmp

 mv /data/1/$file_name.tmp  /data/1/$file_name

done &

创建.conf文件

例:

agent1.sources = s1

agent1.channels = c1

agent1.sinks = k1 k1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic

agent1.sinks.k1.brokerList = kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic

agent1.sinks.k1_1.brokerList =kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

sinks中k1与k1_1实现双线程

使用1个flume连接2个kafka,即同时向2个kafka中录入数据可以在同一agent下配置2个channels和2个sinks,source共用一个

示例:

xxx.conf

agent1.sources = s1

agent1.channels = c1 cx1

agent1.sinks = k1 k1_1 kx1 kx1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1 cx1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic1

agent1.sinks.k1.brokerList =kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic1

agent1.sinks.k1_1.brokerList = kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

agent1.channels.cx1.type = memory

agent1.channels.cx1.keep-alive = 10

agent1.channels.cx1.capacity = 5000

agent1.channels.cx1.transactionCapacity = 1000

agent1.sinks.kx1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1.topic = topic2

agent1.sinks.kx1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1.requiredAcks = 1

agent1.sinks.kx1.batchSize = 500

agent1.sinks.kx1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1.channel = cx1

agent1.sinks.kx1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1_1.topic = topic2

agent1.sinks.kx1_1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1_1.requiredAcks = 1

agent1.sinks.kx1_1.batchSize = 500

agent1.sinks.kx1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1_1.channel = cx1

启动命令如下:

flume-ng agent -c /usr/flume/conf/ -f xxx.conf -n agent1 -Dflume.root.logger=INFO,console >log/1`date +%Y%m%d`.log 2>&1 &

新闻网大数据实时分析可视化系统项目——9Flume+HBase+Kafka集成与开发

1.下载Flume源码并导入Idea开发工具

1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压

2)通过idea导入flume源码

打开idea开发工具,选择File——》Open

技术图片

然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码。

技术图片

2.官方flume与hbase集成的参数介绍

技术图片

3.下载日志数据并分析

到搜狗实验室下载用户查询日志

1)介绍

搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。为进行中文搜索引擎用户行为分析的研究者提供基准研究语料

2)格式说明

数据格式为:访问时间\\t用户ID\\t[查询词]\\t该URL在返回结果中的排名\\t用户点击的顺序号\\t用户点击的URL

其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

技术图片

4.flume agent-3聚合节点与HBase集成的配置

vi flume-conf.properties

agent1.sources = r1

agent1.channels = kafkaC hbaseC

agent1.sinks = kafkaSink hbaseSink

 

agent1.sources.r1.type = avro

agent1.sources.r1.channels = hbaseC

agent1.sources.r1.bind = bigdata-pro01.kfk.com

agent1.sources.r1.port = 5555

agent1.sources.r1.threads = 5

 

agent1.channels.hbaseC.type = memory

agent1.channels.hbaseC.capacity = 100000

agent1.channels.hbaseC.transactionCapacity = 100000

agent1.channels.hbaseC.keep-alive = 20

 

agent1.sinks.hbaseSink.type = asynchbase

agent1.sinks.hbaseSink.table = weblogs

agent1.sinks.hbaseSink.columnFamily = info

agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer

agent1.sinks.hbaseSink.channel = hbaseC

agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

5.对日志数据进行格式处理

1)将文件中的tab更换成逗号

cat weblog.log|tr "\\t" "," > weblog2.log

2)将文件中的空格更换成逗号

cat weblog2.log|tr " " "," > weblog3.log

6.自定义SinkHBase程序设计与开发

1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。

 @Override

    public List getActions() {

        List actions = new ArrayList();

        if (payloadColumn != null) {

            byte[] rowKey;

            try {

                /*---------------------------代码修改开始---------------------------------*/

                //解析列字段

                String[] columns = new String(this.payloadColumn).split(",");

                //解析flume采集过来的每行的值

                String[] values = new String(this.payload).split(",");

                for(int i=0;i < columns.length;i++){

                    byte[] colColumn = columns[i].getBytes();

                    byte[] colValue = values[i].getBytes(Charsets.UTF_8);

 

                    //数据校验:字段和值是否对应

                    if(colColumn.length != colValue.length) break;

 

                    //时间

                    String datetime = values[0].toString();

                    //用户id

                    String userid = values[1].toString();

                    //根据业务自定义Rowkey

                    rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);

                    //插入数据

                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,

                            colColumn, colValue);

                    actions.add(putRequest);

                /*---------------------------代码修改结束---------------------------------*/

                }

 

            } catch (Exception e) {

                throw new FlumeException("Could not get row key!", e);

            }

        }

        return actions;

    }

2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法

/**

   * 自定义Rowkey

   * @param userid

   * @param datetime

   * @return

   * @throws UnsupportedEncodingException

   */

  public static byte[] getKfkRowKey(String userid,String datetime)throws UnsupportedEncodingException {

    return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");

  }

7.自定义编译程序打jar包

1)在idea工具中,选择File——》ProjectStructrue

技术图片

2)左侧选中Artifacts,然后点击右侧的+号,最后选择JAR——》From modules with dependencies

技术图片

3)然后直接点击ok

技术图片

4)删除其他依赖包,只把flume-ng-hbase-sink打成jar包就可以了。

技术图片

技术图片

5)然后依次点击apply,ok

 技术图片

6)点击build进行编译,会自动打成jar包

 技术图片

技术图片

7)到项目的apache-flume-1.7.0-src\\flume-ng-sinks\\flume-ng-hbase-sink\\classes\\artifacts\\flume_ng_hbase_sink_jar目录下找到刚刚打的jar包

技术图片

8)将打包名字替换为flume自带的包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至flume/lib目录下,覆盖原有的jar包即可。

8.flume聚合节点与Kafka集成的配置

vi flume-conf.properties

agent1.sources = r1

agent1.channels = kafkaC hbaseC

agent1.sinks = kafkaSink hbaseSink

 

agent1.sources.r1.type = avro

agent1.sources.r1.channels = hbaseC kafkaC

agent1.sources.r1.bind = bigdata-pro01.kfk.com

agent1.sources.r1.port = 5555

agent1.sources.r1.threads = 5

 

agent1.channels.hbaseC.type = memory

agent1.channels.hbaseC.capacity = 100000

agent1.channels.hbaseC.transactionCapacity = 100000

agent1.channels.hbaseC.keep-alive = 20

 

agent1.sinks.hbaseSink.type = asynchbase

agent1.sinks.hbaseSink.table = weblogs

agent1.sinks.hbaseSink.columnFamily = info

agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer

agent1.sinks.hbaseSink.channel = hbaseC

agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

#*****************flume+Kafka***********************

agent1.channels.kafkaC.type = memory

agent1.channels.kafkaC.capacity = 100000

agent1.channels.kafkaC.transactionCapacity = 100000

agent1.channels.kafkaC.keep-alive = 20

 

agent1.sinks.kafkaSink.channel = kafkaC

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kafkaSink.brokerList = bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092

agent1.sinks.kafkaSink.topic = test

agent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181

agent1.sinks.kafkaSink.requiredAcks = 1

agent1.sinks.kafkaSink.batchSize = 1

agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

 

以上是关于flume与kafka集成配置的主要内容,如果未能解决你的问题,请参考以下文章

新闻网大数据实时分析可视化系统项目——9Flume+HBase+Kafka集成与开发

kafka 集成整合外部插件(springboot,flume,flink,spark)

kafka怎么收集到flume的日志

spring boot怎么启动kafka

CDH-Kerberos环境下使用flume消费带Sentry认证的kafka数据保存到hdfs中

flume-kafka-storm-hdfs-hadoop-hbase