sparkstreaming+kafka

Posted 伊米伊念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparkstreaming+kafka相关的知识,希望对你有一定的参考价值。

生产者

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object spark_kafka_wordcount_producer {
    def main(args: Array[String]) {
       val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_first", "3")
       val props = new HashMap[String, Object]()
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")

      val producer = new KafkaProducer[String, String](props)

      while(true) {
          val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
                .mkString(" ")
          val message = new ProducerRecord[String, String](topic, null, str)
          producer.send(message)
  
          Thread.sleep(1000)
      }
    }
}

 消费者

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object spark_kafka_wordcount_customer {
   def main(args: Array[String]) {
        val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_first")
        val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc =  new StreamingContext(sparkConf, Seconds(1))
        ssc.checkpoint("checkpoint")

        val topicpMap = topics.split(",").map((_,2)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

 

以上是关于sparkstreaming+kafka的主要内容,如果未能解决你的问题,请参考以下文章

超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践

sparkStreaming 读kafka的数据

SparkStreaming消费kafka数据

SparkStreaming---wordcount(kafka)

flume kafka和sparkstreaming整合

SparkStreaming---wordcount(kafka)