客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)相关的知识,希望对你有一定的参考价值。
目录
封装公共接口(根据存储介质抽取特质)
Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口
实现步骤:
- 在etl模块的 realtime 包下创建 StreamApp 特质
- 实现方法:创建读取kafka集群指定主题的数据
- 实现方法:创建execute方法
- 实现方法:创建save方法
package cn.it.logistics.etl.realtime
import cn.it.logistics.common.Configuration
import org.apache.kafka.common.internals.Topic
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SparkSession
/**
* 这是所有ETL流式处理的基类
* kudu、es、ck都要实现这个特质
* 定义三个方法:
* 1)读取数据
* 2)处理数据
* 3)保存数据
*/
trait StreamApp
/**
* 读取数据的方法
* @param sparkSession SparkSession
* @param topic 指定消费的主题
* @param selectExpr 默认值:CAST(value AS STRING)
*/
def getKafkaSource(sparkSession: SparkSession, topic: String, selectExpr:String = "CAST(value AS STRING)") =
sparkSession.readStream.format(Configuration.SPARK_KAFKA_FORMAT)
.options(Map(
"kafka.bootstrap.servers" -> Configuration.kafkaAddress,
"subscribe" -> topic,
"group.id" -> "logistics", //该参数可以省略,不需要指定(官网提到改参数不能设置: kafka的source会在每次query的时候自定创建唯一的group id)
//表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
"failOnDataLoss" -> "false"
)).load().selectExpr(selectExpr)
/**
* 数据的处理
* @param sparkConf
*/
def execute(sparkConf: SparkConf)
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true)
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(五十六): 编写SparkSession对象工具类