客快物流大数据项目(六十六):车辆主题

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(六十六):车辆主题相关的知识,希望对你有一定的参考价值。

文章目录

车辆主题

一、背景介绍

二、指标明细

三、表关联关系

1、​​​​​​​​​​​​​​事实表

2、​​​​​​​维度表

3、​​​​​​​​​​​​​​关联关系

四、​​​​​​​车辆数据拉宽开发

1、拉宽后的字段

2、​​​​​​​​​​​​​​SQL语句

3、Spark实现

五、车辆数据指标开发

1、​​​​​​​​​​​​​​计算的字段

2、​​​​​​​Spark实现


车辆主题

一、背景介绍

车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。

​​​​​​​二、指标明细

指标列表

维度

发车次数

各网点发车次数

各区域发车次数

各公司发车次数

最大发车次数

各网点最大发车次数

各区域最大发车次数

各分公司最大发车次数

最小发车次数

各网点最小发车次数

各区域最小发车次数

各分公司最小发车次数

平均发车次数

各网点平均发车次数

各区域平均发车次数

各分公司平均发车次数

三、​​​​​​​表关联关系

1、​​​​​​​​​​​​​​事实表

表名

描述

tbl_transport_tool

车辆事实表

tbl_warehouse_transport_tool

仓库车辆关联表

2、​​​​​​​维度表

表名

描述

tbl_dot

网点表

tbl_company

公司表

tbl_warehouse

仓库表

tbl_company_warehouse_map

公司仓库关联表

tbl_transport_tool

车辆表

3、​​​​​​​​​​​​​​关联关系

车辆表与维度表的关联关系如下:

四、​​​​​​​车辆数据拉宽开发

1、拉宽后的字段

1.1、拉宽网点车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_codes

codeDesc/ttStateName

state_name

运输工具状态描述

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_dot

id

dot_id

网点id

tbl_dot

dotNumber

dot_number

网点编号

tbl_dot

dotName

dot_name

网点名称

tbl_dot

dotAddr

dot_addr

网点地址

tbl_dot

dotGisAddr

dot_gis_addr

网点GIS地址

tbl_dot

dotTel

dot_tel

网点电话

tbl_dot

manageAreaId

manage_area_id

网点管理辖区ID

tbl_dot

manageAreaGis

manage_area_gis

网点管理辖区地理围栏

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间

年月日格式

​​​​​​​1.2、拉宽仓库车辆表

字段名

别名

字段描述

tbl_transport_tool

id

id

运输工具ID

tbl_transport_tool

brand

brand

运输工具品牌

tbl_transport_tool

model

model

运输工具型号

tbl_transport_tool

type

type

运输工具类型

tbl_codes

codeDesc/ttTypeName

type_name

车辆类型描述

tbl_transport_tool

givenLoad

given_load

额定载重

tbl_transport_tool

loadCnUnit

load_cn_unit

中文载重单位

tbl_transport_tool

loadEnUnit

load_en_unit

英文载重单位

tbl_transport_tool

buyDt

buy_dt

购买时间

tbl_transport_tool

licensePlate

license_plate

牌照

tbl_transport_tool

state

state

运输工具状态

tbl_transport_tool

cdt

cdt

创建时间

tbl_transport_tool

udt

udt

修改时间

tbl_transport_tool

remark

remark

备注

tbl_warehouse

id

ws_id

仓库ID

tbl_warehouse

name

name

仓库名称

tbl_warehouse

addr

addr

仓库地址

tbl_warehouse

addrGis

addr_gis

仓库gis地址

tbl_warehouse

employeeId

employee_id

仓库负责人

tbl_warehouse

type

ws_type

仓库类型

tbl_warehouse

area

area

占地面积

tbl_warehouse

isLease

is_lease

是否租赁

tbl_company

id

company_id

公司ID

tbl_company

companyName

company_name

公司名称

tbl_company

cityId

city_id

城市ID

tbl_company

companyNumber

company_number

公司编号

tbl_company

companyAddr

company_addr

公司地址

tbl_company

companyAddrGis

company_addr_gis

公司gis地址

tbl_company

companyTel

company_tel

公司电话

tbl_company

isSubCompany

is_sub_company

母公司ID

tbl_transport_tool

yyyyMMdd(cdt)

day

创建时间

年月日格式

2、​​​​​​​​​​​​​​SQL语句

​​​​​​​​​​​​​​2.1、拉宽网点车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool"  ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id" 
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id" 

2.2、​​​​​​​​​​​​​​拉宽仓库车辆表

SELECT 
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool"  twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id" 
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id" 
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"

3、Spark实现

实现步骤:

  • dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取运输工具表(tbl_transport_tool)数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取网点运输工具关联表(tbl_dot_transport_tool数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取仓库运输工具关联表(tbl_warehouse_transport_tool数据,并缓存数据
  • 获取公司仓库关联表(tbl_company_warehouse_map数据,并缓存数据
  • 获取仓库表(tbl_warehouse数据,并缓存数据
  • 获取公司表(tbl_company数据,并缓存数据
  • 根据以下方式拉宽仓库车辆明细数据
    • 根据交通工具id,在交通工具表中获取交通工具数据
    • 根据网点id,在网点表中获取网点数据
    • 根据公司id,在公司表中获取公司数据
    • 根据仓库id,在仓库表中获取仓库数据
  • 创建网点车辆明细宽表(若存在则不创建)
  • 创建仓库车辆明细宽表(若存在则不创建)
  • 将仓库车辆明细宽表数据写入到kudu数据表中
  • 删除缓存数据

3.1、初始化环境变量

初始化运单明细拉宽作业的环境变量

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.Configuration, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    sparkSession.stop()
  

 ​​​​3.2、加载运输工具表及车辆相关的表并缓存

  • 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

3.3、​​​​​​​定义网点车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
  .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
  .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
  .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    dotDF("id").as("dot_id"), //网点表dot_id
    dotDF("dotNumber").as("dot_number"), //网点表dot_number
    dotDF("dotName").as("dot_name"), //网点表dot_name
    dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
    dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
    dotDF("dotTel").as("dot_tel"), //网点表dot_tel
    dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
    dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.4、​​​​​​​创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中

网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)

3.5、​​​​​​​​​​​​​​定义仓库车辆宽表的关联关系

  • 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
  .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
  .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
  .join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
  .join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
  .join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
  .withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(ttDF.col("cdt").asc)
  .select(
    ttDF("id"), //车辆表id
    ttDF("brand"), //车辆表brand
    ttDF("model"), //车辆表model
    ttDF("type").cast(IntegerType), //车辆表type
    transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
    ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
    ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
    ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
    ttDF("licensePlate").as("license_plate"), //车辆表license_plate
    ttDF("state").cast(IntegerType), //车辆表state
    transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
    ttDF("cdt"), //车辆表cdt
    ttDF("udt"), //车辆表udt
    ttDF("remark"), //车辆表remark
    wsDF("id").as("ws_id"), //仓库表id
    wsDF("name"), //仓库表name
    wsDF("addr"), //仓库表addr
    wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
    wsDF("employeeId").as("employee_id"), //仓库表employee_id
    wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
    wsDF("area"), //仓库表area
    wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
    companyDF("id").alias("company_id"), //公司表id
    companyDF("companyName").as("company_name"), //公司表company_name
    companyDF("cityId").as("city_id"), //公司表city_id
    companyDF("companyNumber").as("company_number"), //公司表company_number
    companyDF("companyAddr").as("company_addr"), //公司表company_addr
    companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
    companyDF("companyTel").as("company_tel"), //公司表company_tel
    companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
    $"day"
  )

3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中

仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • TransportToolDWD 单例对象中调用save方法

实现过程:

  • TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)

3.7、​​​​​​​​​​​​​​删除缓存数据

为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。

//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()

3.8、​​​​​​​​​​​​​​完整代码

package cn.it.logistics.offline.dwd

import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.storage.StorageLevel

/**
 * 车辆主题开发
 * 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
 * 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
 * 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
 */
object TransportToolDWD extends OfflineApp
  //定义应用的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = 
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建车辆明细宽表的schema表结构
     * 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6)将缓存的数据删除掉
     * 7)停止任务
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  
  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = 
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //加载车辆表数据(事实表)
    val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆表数据
    val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)

    //加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司网点关联表的数据
    val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库车辆关联表数据(事实表)
    val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库公司关联表
    val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)

    //加载仓库表数据
    val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)

    //加载物流码表数据
    val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

    import sparkSession.implicits._
    //获取运输工具类型
    val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))

    //获取运输工具状态
    val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))

    //TODO 4)定义维度表与事实表的关联
    val left_outer: String = "left_outer"
    // 4.1:拉宽网点车辆表
    val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
      .join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
      .join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
      .join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
      .join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
      .join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
      .withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(ttDF.col("cdt").asc)
      .select(
        ttDF("id"), //车辆表id
        ttDF("brand"), //车辆表brand
        ttDF("model"), //车辆表model
        ttDF("type").cast(IntegerType), //车辆表type
        transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
        ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
        ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
        ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
        ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
        ttDF("licensePlate").as("license_plate"), //车辆表license_plate
        ttDF("state").cast(IntegerType), //车辆表state
        transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
        ttDF("cdt"), //车辆表cdt
        ttDF("udt"), //车辆表udt
        ttDF("remark"), //车辆表remark
        dotDF("id").as("dot_id"), //网点表dot_id
        dotDF("dotNumber").as("dot_number"), //网点表dot_number
        dotDF("dotName").as("dot_name"), //网点表dot_name
        dot

以上是关于客快物流大数据项目(六十六):车辆主题的主要内容,如果未能解决你的问题,请参考以下文章

客快物流大数据项目(六十四):运单主题

客快物流大数据项目(六十二):主题及指标开发

客快物流大数据项目(六十七):客户主题

客快物流大数据项目(六十三):快递单主题

客快物流大数据项目(六十五):仓库主题

客快物流大数据项目(六十四):运单主题