客快物流大数据项目(六十六):车辆主题
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(六十六):车辆主题相关的知识,希望对你有一定的参考价值。
文章目录
车辆主题
一、背景介绍
车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。
二、指标明细
指标列表 | 维度 |
发车次数 | 各网点发车次数 |
各区域发车次数 | |
各公司发车次数 | |
最大发车次数 | 各网点最大发车次数 |
各区域最大发车次数 | |
各分公司最大发车次数 | |
最小发车次数 | 各网点最小发车次数 |
各区域最小发车次数 | |
各分公司最小发车次数 | |
平均发车次数 | 各网点平均发车次数 |
各区域平均发车次数 | |
各分公司平均发车次数 |
三、表关联关系
1、事实表
表名 | 描述 |
tbl_transport_tool | 车辆事实表 |
tbl_warehouse_transport_tool | 仓库车辆关联表 |
2、维度表
表名 | 描述 |
tbl_dot | 网点表 |
tbl_company | 公司表 |
tbl_warehouse | 仓库表 |
tbl_company_warehouse_map | 公司仓库关联表 |
tbl_transport_tool | 车辆表 |
3、关联关系
车辆表与维度表的关联关系如下:
四、车辆数据拉宽开发
1、拉宽后的字段
1.1、拉宽网点车辆表
表 | 字段名 | 别名 | 字段描述 |
tbl_transport_tool | id | id | 运输工具ID |
tbl_transport_tool | brand | brand | 运输工具品牌 |
tbl_transport_tool | model | model | 运输工具型号 |
tbl_transport_tool | type | type | 运输工具类型 |
tbl_codes | codeDesc/ttTypeName | type_name | 车辆类型描述 |
tbl_transport_tool | givenLoad | given_load | 额定载重 |
tbl_transport_tool | loadCnUnit | load_cn_unit | 中文载重单位 |
tbl_transport_tool | loadEnUnit | load_en_unit | 英文载重单位 |
tbl_transport_tool | buyDt | buy_dt | 购买时间 |
tbl_transport_tool | licensePlate | license_plate | 牌照 |
tbl_transport_tool | state | state | 运输工具状态 |
tbl_codes | codeDesc/ttStateName | state_name | 运输工具状态描述 |
tbl_transport_tool | cdt | cdt | 创建时间 |
tbl_transport_tool | udt | udt | 修改时间 |
tbl_transport_tool | remark | remark | 备注 |
tbl_dot | id | dot_id | 网点id |
tbl_dot | dotNumber | dot_number | 网点编号 |
tbl_dot | dotName | dot_name | 网点名称 |
tbl_dot | dotAddr | dot_addr | 网点地址 |
tbl_dot | dotGisAddr | dot_gis_addr | 网点GIS地址 |
tbl_dot | dotTel | dot_tel | 网点电话 |
tbl_dot | manageAreaId | manage_area_id | 网点管理辖区ID |
tbl_dot | manageAreaGis | manage_area_gis | 网点管理辖区地理围栏 |
tbl_company | id | company_id | 公司ID |
tbl_company | companyName | company_name | 公司名称 |
tbl_company | cityId | city_id | 城市ID |
tbl_company | companyNumber | company_number | 公司编号 |
tbl_company | companyAddr | company_addr | 公司地址 |
tbl_company | companyAddrGis | company_addr_gis | 公司gis地址 |
tbl_company | companyTel | company_tel | 公司电话 |
tbl_company | isSubCompany | is_sub_company | 母公司ID |
tbl_transport_tool | yyyyMMdd(cdt) | day | 创建时间 年月日格式 |
1.2、拉宽仓库车辆表
表 | 字段名 | 别名 | 字段描述 |
tbl_transport_tool | id | id | 运输工具ID |
tbl_transport_tool | brand | brand | 运输工具品牌 |
tbl_transport_tool | model | model | 运输工具型号 |
tbl_transport_tool | type | type | 运输工具类型 |
tbl_codes | codeDesc/ttTypeName | type_name | 车辆类型描述 |
tbl_transport_tool | givenLoad | given_load | 额定载重 |
tbl_transport_tool | loadCnUnit | load_cn_unit | 中文载重单位 |
tbl_transport_tool | loadEnUnit | load_en_unit | 英文载重单位 |
tbl_transport_tool | buyDt | buy_dt | 购买时间 |
tbl_transport_tool | licensePlate | license_plate | 牌照 |
tbl_transport_tool | state | state | 运输工具状态 |
tbl_transport_tool | cdt | cdt | 创建时间 |
tbl_transport_tool | udt | udt | 修改时间 |
tbl_transport_tool | remark | remark | 备注 |
tbl_warehouse | id | ws_id | 仓库ID |
tbl_warehouse | name | name | 仓库名称 |
tbl_warehouse | addr | addr | 仓库地址 |
tbl_warehouse | addrGis | addr_gis | 仓库gis地址 |
tbl_warehouse | employeeId | employee_id | 仓库负责人 |
tbl_warehouse | type | ws_type | 仓库类型 |
tbl_warehouse | area | area | 占地面积 |
tbl_warehouse | isLease | is_lease | 是否租赁 |
tbl_company | id | company_id | 公司ID |
tbl_company | companyName | company_name | 公司名称 |
tbl_company | cityId | city_id | 城市ID |
tbl_company | companyNumber | company_number | 公司编号 |
tbl_company | companyAddr | company_addr | 公司地址 |
tbl_company | companyAddrGis | company_addr_gis | 公司gis地址 |
tbl_company | companyTel | company_tel | 公司电话 |
tbl_company | isSubCompany | is_sub_company | 母公司ID |
tbl_transport_tool | yyyyMMdd(cdt) | day | 创建时间 年月日格式 |
2、SQL语句
2.1、拉宽网点车辆表
SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool" ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id"
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"
2.2、拉宽仓库车辆表
SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool" twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id"
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id"
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"
3、Spark实现
实现步骤:
- 在dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质
- 初始化环境的参数,创建SparkSession对象
- 获取运输工具表(tbl_transport_tool)数据,并缓存数据
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
- 获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据
- 获取网点表(tbl_dot)数据,并缓存数据
- 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
- 获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据
- 获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据
- 获取仓库表(tbl_warehouse)数据,并缓存数据
- 获取公司表(tbl_company)数据,并缓存数据
- 根据以下方式拉宽仓库车辆明细数据
- 根据交通工具id,在交通工具表中获取交通工具数据
- 根据网点id,在网点表中获取网点数据
- 根据公司id,在公司表中获取公司数据
- 根据仓库id,在仓库表中获取仓库数据
- 创建网点车辆明细宽表(若存在则不创建)
- 创建仓库车辆明细宽表(若存在则不创建)
- 将仓库车辆明细宽表数据写入到kudu数据表中
- 删除缓存数据
3.1、初始化环境变量
初始化运单明细拉宽作业的环境变量
package cn.it.logistics.offline.dwd
import cn.it.logistics.common.Configuration, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit =
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit =
sparkSession.stop()
3.2、加载运输工具表及车辆相关的表并缓存
- 加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
3.3、定义网点车辆宽表的关联关系
- 为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中
网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在TransportToolDWD 单例对象中调用save方法
实现过程:
- 在TransportToolDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
3.5、定义仓库车辆宽表的关联关系
- 为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中
仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在TransportToolDWD 单例对象中调用save方法
实现过程:
- 在TransportToolDWD 单例对象Main方法中调用save方法
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
3.7、删除缓存数据
为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。
//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()
3.8、完整代码
package cn.it.logistics.offline.dwd
import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.storage.StorageLevel
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit =
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit =
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dot以上是关于客快物流大数据项目(六十六):车辆主题的主要内容,如果未能解决你的问题,请参考以下文章