Spark-Streaming DirectKafka count 案例

Posted hanwen1014

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Streaming DirectKafka count 案例相关的知识,希望对你有一定的参考价值。

Spark-Streaming DirectKafka count 统计跟直接 kafka 统计类似,只不过这里使用的是 Direct 的方式,Direct方式使用的 kafka 低级API,不同的地方主要是在 createDirectStream这里。

统计代码如下

package com.hw.streaming

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Seconds, StreamingContext

import scala.collection.mutable

object DirectKafkaWordCount 
  def main(args: Array[String]): Unit = 
    if (args.length < 2) 
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <brokers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
//    smallest和from beiginning是一样的
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset"->"smallest"
    )
//    生成Dstream
    val messages = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(",")(1))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // 开始计算
    ssc.start()
    ssc.awaitTermination()

  


启动相关的 flume,kafka,参见:

https://www.cnblogs.com/hanwen1014/p/11260456.html

以上是关于Spark-Streaming DirectKafka count 案例的主要内容,如果未能解决你的问题,请参考以下文章

spark-streaming对接kafka的两种方式

java spark-streaming接收TCP/Kafka数据

流式计算助力实时数据处理spark-streaming入门实战

如何在 Spark-Streaming 的 DStream 中使用“for”循环进行转换和输出?

spark-streaming问题集锦

spark-streaming first insight