客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)相关的知识,希望对你有一定的参考价值。

目录

封装公共接口(根据存储介质抽取特质)


封装公共接口(根据存储介质抽取特质)

Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口

实现步骤:

  • etl模块realtime 包下创建 StreamApp  特质
  • 实现方法:创建读取kafka集群指定主题的数据
  • 实现方法:创建execute方法
  • 实现方法:创建save方法
package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.kafka.common.internals.Topic
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame, SparkSession

/**
 * 这是所有ETL流式处理的基类
 * kudu、es、ck都要实现这个特质
 * 定义三个方法:
 * 1)读取数据
 * 2)处理数据
 * 3)保存数据
 */
trait StreamApp 

  /**
   * 读取数据的方法
   * @param sparkSession    SparkSession
   * @param topic           指定消费的主题
   * @param selectExpr      默认值:CAST(value AS STRING)
   */
  def getKafkaSource(sparkSession: SparkSession, topic: String, selectExpr:String = "CAST(value AS STRING)") = 
    sparkSession.readStream.format(Configuration.SPARK_KAFKA_FORMAT)
      .options(Map(
        "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
        "subscribe" -> topic,
        "group.id" -> "logistics", //该参数可以省略,不需要指定(官网提到改参数不能设置: kafka的source会在每次query的时候自定创建唯一的group id)
        //表示数据丢失以后(topic被删除,或者offset不存在可用的范围的时候)
        "failOnDataLoss" -> "false"
      )).load().selectExpr(selectExpr)
  

  /**
   * 数据的处理
   * @param sparkConf
   */
  def execute(sparkConf: SparkConf)

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true)

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)的主要内容,如果未能解决你的问题,请参考以下文章

客快物流大数据项目(五十六): 编写SparkSession对象工具类

客快物流大数据项目(五十):项目框架初始化

客快物流大数据项目(五十九):定义解析kafka数据的Bean对象类

客快物流大数据项目(六十二):主题及指标开发

客快物流大数据项目(五十一):数据库表分析

客快物流大数据项目(六十五):仓库主题