客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序相关的知识,希望对你有一定的参考价值。
创建Kudu-ETL流式计算程序
实现步骤:
- 在realtime目录创建 KuduStreamApp 单例对象,继承自 StreamApp 特质
- 重写特质内的方法
- 编写代码接入kafka集群消费其数据
package cn.it.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration, SparkUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.DataFrame, Dataset, Encoders, SparkSession
/**
* Kudu数据管道应用
* 实现KUDU数据库的实时ETL操作
*/
object KuduStreamApp extends StreamApp
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit =
//创建sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(this.getClass.getSimpleName)
)
//数据处理
execute(sparkConf)
/**
* 数据的处理
*
* @param sparkConf
*/
override def execute(sparkConf: SparkConf): Unit =
/**
* 实现步骤:
* 1)创建sparksession对象
* 2)获取数据源(获取物流相关数据以及crm相关数据)
* 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
* 4)抽取每条数据的字段信息
* 5)将过滤出来的每张表写入到kudu数据库
*/
//1)创建sparksession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//2)获取数据源(获取物流相关数据以及crm相关数据)
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("logistics").start()
crmDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("crm").start()
//8)启动运行等待停止
val stream = sparkSession.streams
//stream.active:获取当前活动流式查询的列表
stream.active.foreach(query => println(s"准备启动的查询:$query.name"))
//线程阻塞,等待终止
stream.awaitAnyTermination()
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit =
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(五十二):根据数据库表及字段创建公共模块
客快物流大数据项目(五十六): 编写SparkSession对象工具类
客快物流大数据项目(九十七):ClickHouse的SQL语法