Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)相关的知识,希望对你有一定的参考价值。

1 实时的做数据的ETL

1.1 数据格式:

<<<!>>>3111<<<!>>>
<<<!>>>238<<<!>>>
<<<!>>>20181111132902<<<!>>>
<<<!>>>58.223.1.112<<<!>>>
<<<!>>>202.102.92.18<<<!>>>
<<<!>>>59947<<<!>>>
<<<!>>>80<<<!>>>
<<<!>>>www.sumecjob.com<<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>>http://www.sumecjob.com/Social.aspx<<<!>>>
<<<!>>>2556928066<<<!>>>

1.2 需求

从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,包括userid 用 户操作时间(timestamp) 用户ip地址:端口 服务地址:服务端口 url,最后将清洗结果送回kafka指定的 topic中,完成在线etl。

1. 所有的<<<!>>>直接替换为空串 
2. 所有的日期替换为:yyyy-MM-dd 
3. 使用mysql手动保存偏移量 
    package com.zxy.spark.Streaming.day004
    
    
    import java.util.Properties
    
    import com.zxy.spark.core.Day02.LoggerTrait
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, TaskContext}
    import scalikejdbc.{ConnectionPool, DB, _}
    
    object demo5 extends LoggerTrait{
    
        def main(args: Array[String]): Unit = {
            val sparkConf = new SparkConf()
                .setAppName("demo5")
                .setMaster("local[*]")
            val duration = Seconds(2)
            val ssc = new StreamingContext(sparkConf, duration)
            val kafkaparams = Map[String,String](
                "bootstrap.servers" -> "192.168.130.110:9092",
                "group.id" -> "zxy",
                "auto.offset.reset" -> "largest"
            )
            val topics: Set[String] = "OldTopic".split(",").toSet
            
            val messageHandler = (msgHandler:MessageAndMetadata[String,String]) => (msgHandler.topic,msgHandler.message())
            println(s"getfromoffset->${getfromoffset()}")
            val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                ssc,
                kafkaparams,
                getfromoffset(),
                messageHandler
            )
            messages.foreachRDD(rdd => {
                if(!rdd.isEmpty()){
                 rdd.foreachPartition(partitionIterator => {
                     val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                     val offsetRange: OffsetRange = offsetRanges(TaskContext.getPartitionId())
                     DB.localTx {
                         implicit session => {
                             partitionIterator.foreach(msg => {
                                 //数据清洗
                                 println(s"message->${msg._2}")
                                 val resDate: (String, String, String, String) = dateETL(msg._2)
                                 println(s"resDate -> $resDate")
                                 //将清洗后的数据发送给新的topic
                                 saveDate2Topics(resDate)
                             })
                             val offsetSql = sql"update mytopic set offset = ${offsetRange.untilOffset} where topic = ${offsetRange.topic} and partitionId = ${offsetRange.partition}".update().apply()
                         }
                     }
                 })
                }
            })
            ssc.start()
            ssc.awaitTermination()
        }
        
        /**
         * 获取数据库中的偏移量
         * @return
         */
        def getfromoffset(): Map[TopicAndPartition, Long]={
            val url = "jdbc:mysql://localhost:3306/zxy?useSSL=false"
            val user = "root"
            val pass = "root"
            val driver = "com.mysql.jdbc.Driver"
            Class.forName(driver)
            ConnectionPool.singleton(url,user,pass)
            val fromOffsets: Map[TopicAndPartition, Long] = DB.readOnly {
                implicit session =>
                    sql"select topic,partitionId,offset from mytopic"
                        .map {
                            result => TopicAndPartition(result.string(1), result.int(2)) -> result.long(3)
                        }.list().apply().toMap
            }
            fromOffsets
        }
        
        /**
         * 进行数据清洗操作
         */
        def dateETL(date: String):(String,String,String,String) ={
           val lines: Array[String] = date.split("<<<!>>>")
           val timestamp = lines(5)
            val time: String = getFormatTime(timestamp)
           (lines(2),time,lines(7)+":"+lines(11),lines(27))
        }
        
        /**
         * 将时间戳按指定模式返回
         */
        def getFormatTime(timestamp : String): String = {
            val year = timestamp.substring(0, 4)
            val month = timestamp.substring(4, 6)
            val day = timestamp.substring(6, 8)
            val ymd = s"${year}-${month}-${day}"
            ymd
        }
        
        /**
         * 将清洗过的数据生产到新的topic中(NewTopic)
         */
        def saveDate2Topics(newDate: (String, String, String, String)): Unit ={
           val properties = new Properties()
           properties.load(demo5.getClass.getClassLoader.getResourceAsStream("producer.properties"))
           val producer = new KafkaProducer[String,String](properties)
           val userID: String = newDate._1
            val value = s"${newDate._2},${newDate._3},${newDate._4}"
           val data = new ProducerRecord[String,String]("NewTopic",userID,value)
           producer.send(data)
           producer.close()
        }
    }

在这里插入图片描述

以上是关于Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)的主要内容,如果未能解决你的问题,请参考以下文章

kafka术语和配置介绍

kafka资源进行物理隔离

Kafka shell 查看指定topic partition offset的信息

kafka 0.10.2 解决java无法生产消息到指定topic问题

kafka常用命令大全

Kafka中@KafkaListener如何动态指定多个topic