Spark从Kafka获取数据写入MySQL的实现(流式数据)
Posted 一只楠喃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark从Kafka获取数据写入MySQL的实现(流式数据)相关的知识,希望对你有一定的参考价值。
需求
模拟一个全国各地区的商品销售情况,产生的消费数据发送到Kafka,结构化流Structured Streaming实时消费统计。
1)、全国的总销售情况
2)、各省销售情况
3)、重点城市销售情况;
pojo对象
/**
* 订单实体类(Case Class)
* @param orderId 订单ID
* @param userId 用户ID
* @param orderTime 订单日期时间
* @param ip 下单IP地址
* @param orderMoney 订单金额
* @param orderStatus 订单状态
*/
case class OrderRecord(
orderId: String,
userId: String,
orderTime: String,
ip: String,
orderMoney: Double,
orderStatus: Int
)
实时生产数据写入Kafka
package mock
import java.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
/**
* 模拟生产订单数据,发送到Kafka Topic中
* Topic中每条数据Message类型为String,以JSON格式数据发送
* 数据转换:
* 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)
*/
object MockOrderProducer
def main(args: Array[String]): Unit =
var producer: KafkaProducer[String, String] = null
try
// 1. Kafka Client Producer 配置信息
val props = new Properties()
props.put("bootstrap.servers", "192.168.52.100:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
// 2. 创建KafkaProducer对象,传入配置信息
producer = new KafkaProducer[String, String](props)
// 随机数实例对象
val random: Random = new Random()
// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
while(true)
// 每次循环 模拟产生的订单数目
val batchNumber: Int = random.nextInt(1) + 5
(1 to batchNumber).foreachnumber =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"$getDate(currentTime)%06d".format(number)
val userId: String = s"$1 + random.nextInt(5)%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"$5 + random.nextInt(500).%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 订单记录数据
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 转换为JSON格式数据
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 构建ProducerRecord对象
val record = new ProducerRecord[String, String]("orderTopic", orderJson)
// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
producer.send(record)
Thread.sleep(random.nextInt(100) + 500)
catch
case e: Exception => e.printStackTrace()
finally
if(null != producer) producer.close()
/**=================获取当前时间=================*/
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String =
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
/**================= 获取随机IP地址 =================*/
def getRandomIp: String =
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
/**=================将Int类型IPv4地址转换为字符串类型=================*/
def number2IpString(ip: Int): String =
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".")
环境配置
def main(args: Array[String]): Unit =
//获取SparSession对象
val spark = SparkSession.builder()
.appName(this.getClass.getCanonicalName.stripPrefix("$"))
.master("local[3]")
.config("spark.sql.shuffle.partitions",3)
.config("spark.testing.memory","477000000")
.getOrCreate()
//从kafka中读取数据
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","192.168.52.100:9092")
.option("subscribe","orderTopic")
.option("maxPerOffsetTrigger","true")
.load()
//进行ip解析地址
val pc_cast=udf((ip:String)=>
val searcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")
val block = searcher.btreeSearch(ip)
val region = block.getRegion
val Array(_,_,province,city,_)=region.split("\\\\|")
//返回省市二元组
(province,city)
)
import spark.implicits._
println(kafkaStream.toString())
val etlStream = kafkaStream
//将value类型转为String
.selectExpr("CAST (value AS STRING)")
//将DataFream转为DataSet
.as[String]
//查询需求字段
.select(
get_json_object($"value","$.ip").as("ip"),
get_json_object($"value","$.orderMoney").cast(DataTypes.createDecimalType(10,2)).as("money"),
get_json_object($"value","$.orderStatus").as("status")
)
//过滤错误信息
.filter($"status"===0)
//获取省市二元组信息
.withColumn("region",pc_cast($"ip"))
.select(
$"region._1".as("province"),
$"region._2".as("city"),
$"money"
)
//totalNums(etlStream)
//provinceNums(etlStream)
cityNums(etlStream)
spark.streams.awaitAnyTermination()
需求一、全国的总销售情况
def totalNums(etlStream: DataFrame) =
import etlStream.sparkSession.implicits._
//聚合完成需求分析
val frame = etlStream.agg(sum($"money").as("total_amt"))
.withColumn("type",lit("全国"))
writeTomysql(frame,"total")
需求二、各省销售情况
def provinceNums(etlStream: DataFrame) =
import etlStream.sparkSession.implicits._
//聚合完成需求分析
val frame = etlStream.groupBy("province").agg(sum($"money").as("total_amt"))
writeToMysql(frame,"province")
需求三、重点城市销售情况
def cityNums(etlStream: DataFrame): Unit =
val cities: Array[String] = Array(
"北京市", "上海市", "深圳市", "广州市", "杭州市", "成都市", "南京市", "武汉市", "西安市"
)
import etlStream.sparkSession.implicits._
//udf函数判断重点城市
val city_udf=udf((city:String)=>
cities.contains(city)
)
//聚合完成需求分析
val frame = etlStream
.filter(city_udf($"city"))
.groupBy("city").agg(sum($"money").as("total_amt"))
frame.writeStream.format("console").outputMode("complete").start()
//writeToMysql(frame,"city")
写入MySQL
def writeToMysql(frame: DataFrame, str: String) =
//写入Mysql表
val stream = frame.writeStream
//设置输出模式
.outputMode(OutputMode.Complete())
//设置检查点目录
.queryName(s"$str_ckpt")
//设置触发时间
.trigger(Trigger.ProcessingTime("10 seconds"))
.foreachBatch((batchDF:DataFrame,batchId:Long)=>
batchDF
.coalesce(1)
.write
.mode("overwrite")
.format("jdbc")
//url连接
.option("url","jdbc:mysql://localhost:3306/exam?serverTimezone=UTC&characterEncodig=utf8&useUnicode=true")
.option("driver","com.mysql.cj.jdbc.Driver")
//用户
.option("user","root")
//密码
.option("password","123456")
//表名
.option("dbtable",s"exam.order_report_$str")
.save()
)
//检查点目录
.option("checkpointLocation",s"datas/$str-ckpt")
.start()
stream.awaitTermination()
stream.stop()
以上是关于Spark从Kafka获取数据写入MySQL的实现(流式数据)的主要内容,如果未能解决你的问题,请参考以下文章
python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示
python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示