Spark-Streaming DirectKafka count 案例
Posted hanwen1014
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Streaming DirectKafka count 案例相关的知识,希望对你有一定的参考价值。
Spark-Streaming DirectKafka count 统计跟直接 kafka 统计类似,只不过这里使用的是 Direct 的方式,Direct方式使用的 kafka 低级API,不同的地方主要是在 createDirectStream这里。
统计代码如下
package com.hw.streaming import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.Seconds, StreamingContext import scala.collection.mutable object DirectKafkaWordCount def main(args: Array[String]): Unit = if (args.length < 2) System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) val Array(brokers, topics) = args // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(60)) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet // smallest和from beiginning是一样的 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset"->"smallest" ) // 生成Dstream val messages = KafkaUtils .createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(",")(1)) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // 开始计算 ssc.start() ssc.awaitTermination()
启动相关的 flume,kafka,参见:
https://www.cnblogs.com/hanwen1014/p/11260456.html
以上是关于Spark-Streaming DirectKafka count 案例的主要内容,如果未能解决你的问题,请参考以下文章
java spark-streaming接收TCP/Kafka数据
流式计算助力实时数据处理spark-streaming入门实战