Flume和Kafka完成实时数据的采集

Posted liuge36

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume和Kafka完成实时数据的采集相关的知识,希望对你有一定的参考价值。

Flume和Kafka完成实时数据的采集

写在前面
Flume和Kafka在生产环境中,一般都是结合起来使用的。可以使用它们两者结合起来收集实时产生日志信息,这一点是很重要的。如果,你不了解flume和kafka,你可以先查看我写的关于那两部分的知识。再来学习,这部分的操作,也是可以的。

实时数据的采集,就面临一个问题。我们的实时数据源,怎么产生呢?因为我们可能想直接获取实时的数据流不是那么的方便。我前面写过一篇文章,关于实时数据流的python产生器,文章地址:http://blog.csdn.net/liuge36/article/details/78596876
你可以先看一下,如何生成一个实时的数据...

思路??如何开始呢??

分析:我们可以从数据的流向着手,数据一开始是在webserver的,我们的访问日志是被nginx服务器实时收集到了指定的文件,我们就是从这个文件中把日志数据收集起来,即:webserver=>flume=>kafka

webserver日志存放文件位置
这个文件的位置,一般是我们自己设置的

我们的web日志存放的目录是在:
/home/hadoop/data/project/logs/access.log下面

[[email protected] logs]$ pwd
/home/hadoop/data/project/logs
[[email protected] logs]$ ls
access.log
[[email protected] logs]$ 

Flume

做flume,其实就是写conf文件,就面临选型的问题
source选型?channel选型?sink选型?

这里我们选择 exec source memory channel kafka sink

怎么写呢?
按照之前说的那样1234步骤

从官网中,我们可以找到我们的选型应该如何书写:
1) 配置Source
exec source

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c

2) 配置Channel
memory channel

a1.channels.c1.type = memory

3) 配置Sink
kafka sink
flume1.6版本可以参照http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#kafka-sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

4) 把以上三个组件串起来

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

我们new一个文件叫做test3.conf
把我们自己分析的代码贴进去:

[[email protected] conf]$ vim test3.conf 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c


a1.channels.c1.type = memory


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里我们先不启动,因为其中涉及到kafka的东西,必须先把kafka部署起来,,

Kafka的部署

kafka如何部署呢??
参照官网的说法,我们首先启动一个zookeeper进程,接着,才能够启动kafka的server

Step 1: Start the zookeeper

[[email protected] ~]$ 
[[email protected] ~]$ jps
29147 Jps
[[email protected] ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] ~]$ jps
29172 QuorumPeerMain
29189 Jps
[[email protected] ~]$ 

Step 2: Start the server

[[email protected] ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外开一个窗口,查看jps
[[email protected] ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[[email protected] ~]$ 

如果,这部分不是很熟悉,可以参考http://blog.csdn.net/liuge36/article/details/78592169

Step 3: Create a topic

[[email protected] ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[[email protected] ~]$ 

Step 4: 开启之前的agent

  [[email protected] conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

Step 5: Start a consumer

kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

上面的第五步执行之后,就会收到刷屏的结果,哈哈哈!!
技术分享图片

上面的消费者会一直一直的刷屏,还是很有意思的!!!
这里的消费者是把接收到的数据数据到屏幕上

后面,我们会介绍,使用SparkStreaming作为消费者实时接收数据,并且接收到的数据做简单数据清洗的开发,从随机产生的日志中筛选出我们需要的数据.....
















以上是关于Flume和Kafka完成实时数据的采集的主要内容,如果未能解决你的问题,请参考以下文章

Flume整合Kafka完成实时数据采集

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

日志采集系统flume和kafka有啥区别及联系,它们分别在啥时候

转:大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

flume kafka和sparkstreaming整合