scala spark(2.10)读取kafka(2.10)示例

Posted Runner_Jack

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了scala spark(2.10)读取kafka(2.10)示例相关的知识,希望对你有一定的参考价值。

1、pom加载jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>

2、代码
object Demo01 {

def main(args: Array[String]): Unit = {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers ="hadoop01:9092"
val topics="test"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines=messages.map(_._2)
val wordCounts=lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://hadoop01:9000/spark/wordcount.txt")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

以上是关于scala spark(2.10)读取kafka(2.10)示例的主要内容,如果未能解决你的问题,请参考以下文章

从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题

无法读取工件描述符:IntelliJ

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

spark2.1对应kafka哪个版本

Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.VolatileByteRef.crea

Spark createDirectStream 维护 Kafka offset(Scala)