程序实现kafka 生产和消费

Posted brentboys

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了程序实现kafka 生产和消费相关的知识,希望对你有一定的参考价值。

生产端程序

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
import java.util
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    //kafka-console-producer.sh --broker-list master:9092,master:9093 -topic mykafka2
    val brokers="master:9092,master:9093"
    val topic = "mykafka2"

    val props = new util.HashMap[String,Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    val msgPerSec = 2
    val wordgPerMsg = 3
    val producer = new KafkaProducer[String,String](props)
    while(true){
      for(i<- 1 to msgPerSec){
        val str = (1 to wordgPerMsg).map(x=>Random.nextInt(100).toString).mkString(" ")
        println(str)
        val msg = new ProducerRecord[String,String](topic,null,str)
        producer.send(msg)
      }
      Thread.sleep(1000)
    }

  }
}

 

生产端程序运行结果

技术图片

 

消费端程序

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._

object kafkaWorldCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaWorldCount")
    val ssc = new StreamingContext(conf,Seconds(10))
    ssc.sparkContext.setLogLevel("warn")

    val zkQurom = "master:12181/kafka0.11"
    val group = "888"
    val topics ="mykafka2"
    val numThreads = 3
    val topMap = topics.split(" ").map((_,numThreads)).toMap

    val lines = KafkaUtils.createStream(ssc,zkQurom,group,topMap)
    val words = lines.map(_._2).flatMap(_.split(" "))
    val pairs = words.map((_,1))
    val wordcounts = pairs.reduceByKey(_+_)
    wordcounts.print()
    ssc.start()
    ssc.awaitTermination()

  }
}

消费端程序运行结果

技术图片

 

以上是关于程序实现kafka 生产和消费的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 生产者和消费者实例

Java程序创建Kafka Topic,以及数据生产消费,常用的命令

请教一个关于使用spark 读取kafka只能读取一个分区数据的问题

Java实现Kafka的生产者和消费者例子

Java程序员面试必备——kafka的专业术语

kafka生产者多种实现方式