1、pom加载jar包
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
2、代码
object Demo01 {
def main(args: Array[String]): Unit = {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers ="hadoop01:9092"
val topics="test"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines=messages.map(_._2)
val wordCounts=lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://hadoop01:9000/spark/wordcount.txt")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}