每日一题 为了工作 2020 0502 第六十一题

Posted walxt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了每日一题 为了工作 2020 0502 第六十一题相关的知识,希望对你有一定的参考价值。

//使用kafka+sparkStreaming进行数据处理

//从kafka拉取数据

package com.swust.predict


import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}


object GetDataFromKafka {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GetDataFromKafka").setMaster("local[*]")
    conf.set("spark.streaming.kafka.consumer.cache.enabled","false")

    val ssc = new StreamingContext(conf,Seconds(5))
    val topics = Set("car_events")
    val brokers = "data001:9092,data003:9092,data004:9092"
    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "predictGroup",//
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
    )
    val index = 1
    //创建Dstream
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val events: DStream[String] = kafkaDstream.map(line => {
      val value: String = line.value().toString
      value
    })
    val show: Unit = events.foreachRDD(rdd => {
      rdd.foreachPartition(data => {
        //data.take(200)
        data.foreach(one => {
          println(one)
        })

      })
    })
//    events.foreachRDD(rdd =>{
//      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      events.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
//    })
    ssc.start()
    ssc.awaitTermination()
  }

}

  

//向kafka推送数据

package com.traffic.streaming

import java.util.Properties

import net.sf.json.JSONObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}

//向kafka car_events中生产数据
object KafkaEventProducer {
  def main(args: Array[String]): Unit = {

    //设置需要写入数据的消息队列
    val topic = "car_events"
    //设置配置属性信息
    val props = new Properties()
    props.put("bootstrap.servers", "data001:9092,data003:9092,data004:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //创建kafka消息队列的生产者对象
    val producer = new KafkaProducer[String,String](props)

    val sparkConf = new SparkConf().setAppName("traffic data").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)

    val records: Array[Array[String]] = sc.textFile("F:codeAnyMavendatacarFlow_all_column_test.txt")
      .filter(!_.startsWith(";")) //过滤掉不以;开头的数据
      .map(_.split(",")).collect()


    for (i <- 1 to 1000) {
      for (record <- records) {
        // prepare event data
        val event = new JSONObject()
        event.put("camera_id", record(0))
        event.put("car_id", record(2))
        event.put("event_time", record(4))
        event.put("speed", record(6))
        event.put("road_id", record(13))
        // produce event message
        //向kafka中输入数据
        producer.send(new ProducerRecord[String, String](topic, event.toString))
//        println("Message sent: " + event)
        Thread.sleep(200)
      }
    }
    sc.stop
  }
}

  

//运行结果

//向kafka集群拖送数据

技术图片

 

 //从kafka集群拉取数据

技术图片

 

 

以上是关于每日一题 为了工作 2020 0502 第六十一题的主要内容,如果未能解决你的问题,请参考以下文章

每日一题 为了工作 2020 0510 第六十八题

每日一题 为了工作 2020 0508 第六十六题

每日一题 为了工作 2020 0508 第六十六题

每日一题 为了工作 2020 056 第六十四题

每日一题 为了工作 2020 056 第六十四题

每日一题 为了工作 2020 0501 第六十题