Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)相关的知识,希望对你有一定的参考价值。
1 实时的做数据的ETL
1.1 数据格式:
<<<!>>>3111<<<!>>>
<<<!>>>238<<<!>>>
<<<!>>>20181111132902<<<!>>>
<<<!>>>58.223.1.112<<<!>>>
<<<!>>>202.102.92.18<<<!>>>
<<<!>>>59947<<<!>>>
<<<!>>>80<<<!>>>
<<<!>>>www.sumecjob.com<<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>><<<!>>>
<<<!>>>http://www.sumecjob.com/Social.aspx<<<!>>>
<<<!>>>2556928066<<<!>>>
1.2 需求
从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,包括userid 用 户操作时间(timestamp) 用户ip地址:端口 服务地址:服务端口 url,最后将清洗结果送回kafka指定的 topic中,完成在线etl。
1. 所有的<<<!>>>直接替换为空串
2. 所有的日期替换为:yyyy-MM-dd
3. 使用mysql手动保存偏移量
package com.zxy.spark.Streaming.day004
import java.util.Properties
import com.zxy.spark.core.Day02.LoggerTrait
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, TaskContext}
import scalikejdbc.{ConnectionPool, DB, _}
object demo5 extends LoggerTrait{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("demo5")
.setMaster("local[*]")
val duration = Seconds(2)
val ssc = new StreamingContext(sparkConf, duration)
val kafkaparams = Map[String,String](
"bootstrap.servers" -> "192.168.130.110:9092",
"group.id" -> "zxy",
"auto.offset.reset" -> "largest"
)
val topics: Set[String] = "OldTopic".split(",").toSet
val messageHandler = (msgHandler:MessageAndMetadata[String,String]) => (msgHandler.topic,msgHandler.message())
println(s"getfromoffset->${getfromoffset()}")
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc,
kafkaparams,
getfromoffset(),
messageHandler
)
messages.foreachRDD(rdd => {
if(!rdd.isEmpty()){
rdd.foreachPartition(partitionIterator => {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val offsetRange: OffsetRange = offsetRanges(TaskContext.getPartitionId())
DB.localTx {
implicit session => {
partitionIterator.foreach(msg => {
//数据清洗
println(s"message->${msg._2}")
val resDate: (String, String, String, String) = dateETL(msg._2)
println(s"resDate -> $resDate")
//将清洗后的数据发送给新的topic
saveDate2Topics(resDate)
})
val offsetSql = sql"update mytopic set offset = ${offsetRange.untilOffset} where topic = ${offsetRange.topic} and partitionId = ${offsetRange.partition}".update().apply()
}
}
})
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取数据库中的偏移量
* @return
*/
def getfromoffset(): Map[TopicAndPartition, Long]={
val url = "jdbc:mysql://localhost:3306/zxy?useSSL=false"
val user = "root"
val pass = "root"
val driver = "com.mysql.jdbc.Driver"
Class.forName(driver)
ConnectionPool.singleton(url,user,pass)
val fromOffsets: Map[TopicAndPartition, Long] = DB.readOnly {
implicit session =>
sql"select topic,partitionId,offset from mytopic"
.map {
result => TopicAndPartition(result.string(1), result.int(2)) -> result.long(3)
}.list().apply().toMap
}
fromOffsets
}
/**
* 进行数据清洗操作
*/
def dateETL(date: String):(String,String,String,String) ={
val lines: Array[String] = date.split("<<<!>>>")
val timestamp = lines(5)
val time: String = getFormatTime(timestamp)
(lines(2),time,lines(7)+":"+lines(11),lines(27))
}
/**
* 将时间戳按指定模式返回
*/
def getFormatTime(timestamp : String): String = {
val year = timestamp.substring(0, 4)
val month = timestamp.substring(4, 6)
val day = timestamp.substring(6, 8)
val ymd = s"${year}-${month}-${day}"
ymd
}
/**
* 将清洗过的数据生产到新的topic中(NewTopic)
*/
def saveDate2Topics(newDate: (String, String, String, String)): Unit ={
val properties = new Properties()
properties.load(demo5.getClass.getClassLoader.getResourceAsStream("producer.properties"))
val producer = new KafkaProducer[String,String](properties)
val userID: String = newDate._1
val value = s"${newDate._2},${newDate._3},${newDate._4}"
val data = new ProducerRecord[String,String]("NewTopic",userID,value)
producer.send(data)
producer.close()
}
}
以上是关于Spark-core(从kafka指定的topic中读取如上数据,进行清洗,剔除上述无用字段,保留有用信息,最后将清洗结果送回kafka指定的 topic中)的主要内容,如果未能解决你的问题,请参考以下文章
Kafka shell 查看指定topic partition offset的信息