spark小案例——sparkstreaming消费Kafka
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark小案例——sparkstreaming消费Kafka相关的知识,希望对你有一定的参考价值。
使用sparkstreaming消费Kafka的数据,实现word count
依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
实现wordcount代码
val conf = new SparkConf().setAppName("StreamWordCount").setMaster("local[*]")
val sc = new StreamingContext(conf, Seconds(3))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop01:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
/**
* earliest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*/
"auto.offset.reset" -> "earliest"
)
/**
* LocationStrategies.PreferBrokers() 仅仅在你 spark 的 executor 在相同的节点上,优先分配到存在 kafka broker 的机器上;
* LocationStrategies.PreferConsistent(); 大多数情况下使用,一致性的方式分配分区所有 executor 上。(主要是为了分布均匀)
* 新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。
* 在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。
*/
val topics = Array("topic01")
val stream = KafkaUtils.createDirectStream[String, String](
sc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
/* val kafkaStream = stream.map(record => (record.key, record.value))
val words = kafkaStream.map(_._2)
val pairs = words.map x => (x,1)
val wordCounts = pairs.reduceByKey(_+_)
*/
val value: DStream[String] = stream.map(record => record.value())
value.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()
sc.start()
sc.awaitTermination()
启动Kafka,创建Kafka producer
kafka-console-producer.sh --broker-list hadoop01:9092 --to topic01
以上是关于spark小案例——sparkstreaming消费Kafka的主要内容,如果未能解决你的问题,请参考以下文章
spark小案例——sparkstreaming消费Kafka
sparkstreaming+socket workCount 小案例
通过spark-submit,本地测试SparkStreaming