Spark 学习笔记之 Streaming和Kafka Direct

Posted AK47Sonic

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 学习笔记之 Streaming和Kafka Direct相关的知识,希望对你有一定的参考价值。

Streaming和Kafka Direct:

 

Spark version: 2.2.0

Scala version: 2.11

Kafka version: 0.11.0.0

 

Note: 最新版本感觉接口变化很大,参数都调整了,今天就先写个Streaming word count的例子吧,以后再慢慢深入学习。

build.sbt:

 

name := "SparkProjects"

version := "0.1"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"

 

Word Count:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaDirect {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaMapParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topicsSet = Set("ScalaTopic")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaMapParams)
    )
    kafkaStream.flatMap(row => row.value().split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()

  }
}

 

以上是关于Spark 学习笔记之 Streaming和Kafka Direct的主要内容,如果未能解决你的问题,请参考以下文章

Spark发行版笔记9:Spark Streaming源码解读之Receiver生成全生命周期彻底研究和思考

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

Spark学习笔记——Spark Streaming

Spark StreamingSpark Day11:Spark Streaming 学习笔记

Spark StreamingSpark Day11:Spark Streaming 学习笔记

Spark StreamingSpark Day10:Spark Streaming 学习笔记