spark小案例——sparkstreaming消费Kafka

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark小案例——sparkstreaming消费Kafka相关的知识,希望对你有一定的参考价值。

使用sparkstreaming消费Kafka的数据,实现word count

依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
<dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-core</artifactId>
     <version>2.10.1</version>
</dependency>

实现wordcount代码

val conf = new SparkConf().setAppName("StreamWordCount").setMaster("local[*]")
    val sc = new StreamingContext(conf, Seconds(3))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
    /**
     * earliest
     * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
     * latest
     * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
     * none
     * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
     */
      "auto.offset.reset" -> "earliest"
    )

    /**
     * LocationStrategies.PreferBrokers() 仅仅在你 spark 的 executor 在相同的节点上,优先分配到存在  kafka broker 的机器上;
     * LocationStrategies.PreferConsistent(); 大多数情况下使用,一致性的方式分配分区所有 executor 上。(主要是为了分布均匀)
     * 新的Kafka使用者API将预先获取消息到缓冲区。因此,出于性能原因,Spark集成将缓存的消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。
     * 在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区。如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意为该分区安排Kafka领导者的分区。
     */
    val topics = Array("topic01")
    val stream = KafkaUtils.createDirectStream[String, String](
      sc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )


    /*  val kafkaStream = stream.map(record => (record.key, record.value))
        val words = kafkaStream.map(_._2)
        val pairs = words.map  x => (x,1) 
        val wordCounts = pairs.reduceByKey(_+_)
    */
    val value: DStream[String] = stream.map(record => record.value())
    value.flatMap(_.split(" "))
        .map((_,1))
        .reduceByKey(_+_)
        .print()

    sc.start()
    sc.awaitTermination()

启动Kafka,创建Kafka producer

kafka-console-producer.sh --broker-list hadoop01:9092 --to topic01

以上是关于spark小案例——sparkstreaming消费Kafka的主要内容,如果未能解决你的问题,请参考以下文章

spark小案例——sparkstreaming消费Kafka

sparkstreaming+socket workCount 小案例

Spark之SparkStreaming案例

通过spark-submit,本地测试SparkStreaming

第1课:通过案例对SparkStreaming 透彻理解三板斧之一

原创 Hadoop&Spark 动手实践 11Spark Streaming 应用与动手实践