Flume+kakfa+sparkStream实时处理数据测试

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume+kakfa+sparkStream实时处理数据测试相关的知识,希望对你有一定的参考价值。

flume:从数据源拉取数据

kafka:主要起到缓冲从flume拉取多了的数据
sparkStream:对数据进行处理
 
一.flume拉取数据
 
1.源数据文件读取配置
 
在flume目录的conf目录下配置读取数据源的配置,配置一个test.properties文件,内容如下:
 
a1.sources = r1
a1.channels = c1 
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
a1.sources.r1.restartThrottle = 1000
a1.sources.r1.logStdErr = true
#a1.sources.r1.restart = true
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.keepalive = 100
a1.sinks.k1.type =org.apache.flume.plugins.KafkaSink
a1.sinks.k1.metadata.broker.list=192.168.22.7:9092,192.168.22.8:9092,192.168.22.9:9092
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.request.required.acks=1
a1.sinks.k1.max.message.size=1000000
a1.sinks.k1.producer.type=sync
a1.sinks.k1.custom.encoding=UTF-8
a1.sinks.k1.custom.topic.name=test
a1.sinks.k1.channel=c1
a1.sinks.k1.product.source.name=6
配置读取源文件的读取路径如下:
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
读取的数据传到kafka的哪个topic下:
a1.sinks.k1.custom.topic.name=test
2.启动flume读取数据
bin/flume-ng  agent -c conf -f conf/test.properties -n a1 -Dflume.root.logger=INFO,console
 
二.kafka缓冲数据
 
   1.启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
   2.启动kafka服务
bin/kafka-server-start.sh config/server.properties &
3.创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
集群情况下,localhost换成集群的master地址
4.查看kafka的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
三.SparkStream处理数据
1.用spark中自带例子进行测试
进入spark目录
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 my-consumer-group test 1
zoo01,zoo02,zoo03替换为集群的zookeeper地址
2.往源文件中加入数据
echo "test test" >> test.txt
sparkStream会统计源数据中单词的数量并输出
 
 

以上是关于Flume+kakfa+sparkStream实时处理数据测试的主要内容,如果未能解决你的问题,请参考以下文章

flume+sparkStreaming实例 实时监控文件demo

使用Flume+Kafka+SparkStreaming进行实时日志分析

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

flume kafka和sparkstreaming整合

[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示