spark_to_kakfa

Posted yin-fei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark_to_kakfa相关的知识,希望对你有一定的参考价值。

package kafka

import java.io.InputStream
import java.text.SimpleDateFormat
import java.util.{Date, HashMap, Properties}

import com.google.gson.JsonObject
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ExpandTimes {
  val prop = new Properties()
  val is: InputStream = this.getClass().getResourceAsStream("/conf.properties")
  prop.load(is)
  val ENVIRONMENT_SETING = "expandtimes_brokers_prd"
  private val brokers = prop.getProperty(ENVIRONMENT_SETING)
  // Zookeeper connection properties
  private val props = new HashMap[String, Object]()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  private val producer = new KafkaProducer[String, String](this.props)

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ExpandTimes")
    val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val date = new Date(new Date().getTime - 86400000L)
    val dateFormat = new SimpleDateFormat("yyyyMMdd")
    val statisDate = dateFormat.format(date)
    val querySql1 = "select member_id,times from sospdm.tdm_rpt_sign_expand_great_seven_d where statis_date = " + statisDate
    val resultDF1 = spark.sql(querySql1)
    if(!ENVIRONMENT_SETING.contains("prd")){
      resultDF1.show(10)
    }
    resultDF1.rdd.foreach(row => {
      val member_id: String = row.getAs[String]("member_id").toString()
      val times: Int = row.getAs[Int]("times").toInt
      val json = new JsonObject()
      json.addProperty("memberId", member_id).toString
      json.addProperty("times", times).toString
      kafkaProducerSend(json.toString)
    })

    def kafkaProducerSend(args: String) {
      if (args != null) {
        val topic = "sign_status_count"
        val message = new ProducerRecord[String, String](topic, null, args)
        producer.send(message)
      }
    }
  }
}

 

以上是关于spark_to_kakfa的主要内容,如果未能解决你的问题,请参考以下文章

VSCode自定义代码片段——CSS选择器

谷歌浏览器调试jsp 引入代码片段,如何调试代码片段中的js

片段和活动之间的核心区别是啥?哪些代码可以写成片段?

VSCode自定义代码片段——.vue文件的模板

VSCode自定义代码片段6——CSS选择器

VSCode自定义代码片段——声明函数