客快物流大数据项目(六十四):运单主题
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(六十四):运单主题相关的知识,希望对你有一定的参考价值。
目录
运单主题
一、背景介绍
“运单是运输合同的证明,是承运人已经接收货物的收据。一份运单,填写一个托运人、收货人、起运港、到达港。如同一托运人的货物分别属到达港的两个或两个以上收货人,则应分别填制运单。”
运单统计根据区域、公司、网点、线路、运输工具等维度进行统计,可以对各个维度运单数量进行排行,如对网点运单进行统计可以反映该网点的运营情况,对线路运单进行统计可以观察每个线路的运力情况
二、指标明细
指标列表 | 维度 |
运单数 | 总运单数 |
最大运单数 | 最大区域运单数 |
各分公司最大运单数 | |
各网点最大运单数 | |
各线路最大运单数 | |
各运输工具最大运单数 | |
各类客户最大运单数 | |
最小运单数 | 各区域最小运单数 |
各分公司最小运单数 | |
各网点最小运单数 | |
各线路最小运单数 | |
各运输工具最小运单数 | |
各类客户最小运单数 | |
平均运单数 | 各区域平均运单数 |
各分公司平均运单数 | |
各网点平均运单数 | |
各线路平均运单数 | |
各运输工具平均运单数 | |
各类客户平均运单数 |
三、表关联关系
1、事实表
表名 | 描述 |
tbl_waybill | 运单表 |
2、维度表
表名 | 描述 |
tbl_courier | 快递员表 |
tbl_transport_record | 转运记录表 |
tbl_areas | 区域表 |
tbl_route | 线路表 |
tbl_dot | 网点表 |
tbl_company | 公司表 |
tbl_warehouse | 仓库表 |
tbl_customer | 客户表 |
tbl_company_warehouse_map | 公司仓库关联表 |
tbl_transport_tool | 车辆表 |
tbl_codes | 物流系统码表 |
3、关联关系
运单表与维度表的关联关系如下:
四、运单数据拉宽开发
1、拉宽后的字段
表 | 字段名 | 别名 | 字段描述 |
tbl_waybill | id | id | 运单id |
tbl_waybill | expressBillNumber | express_bill_number | 快递单编号 |
tbl_waybill | waybillNumber | waybill_number | 运单编号 |
tbl_waybill | cid | cid | 客户id |
tbl_customer | name | cname | 客户名字 |
tbl_codes | consumerType | ctype | 客户类型编号 |
tbl_codes | consumerTypeName | ctype_name | 客户类型名称 |
tbl_waybill | eid | eid | 收件员工ID |
tbl_courier | name | ename | 收件员工名称 |
tbl_dot | id | dot_id | 网点id |
tbl_dot | dotName | dot_name | 网点名称 |
tbl_areas | id | area_id | 区域id |
tbl_areas | name | area_name | 区域名称 |
tbl_waybill | orderChannelId | order_channel_id | 下单渠道id |
tbl_codes | orderChannelName | order_channel_name | 下单渠道名称 |
tbl_waybill | orderDt | order_dt | 下单时间 |
tbl_waybill | orderTerminalType | order_terminal_type | 下单设备类型ID |
tbl_waybill | orderTerminalOsType | order_terminal_os_type | 下单设备操作系统ID |
tbl_waybill | reserveDt | reserve_dt | 预约取件时间 |
tbl_waybill | isCollectPackageTimeout | is_collect_package_timeout | 是否取件超时 |
tbl_waybill | pkgId | pkg_id | 订装ID |
tbl_waybill | pkgNumber | pkg_number | 订装编号 |
tbl_waybill | timeoutDt | timeout_dt | 超时时间 |
tbl_waybill | transformType | transform_type | 运输方式 |
tbl_waybill | deliveryCustomerName | delivery_customer_name | 发货人 |
tbl_waybill | deliveryAddr | delivery_addr | 发货地址 |
tbl_waybill | deliveryMobile | delivery_mobile | 发货人手机 |
tbl_waybill | deliveryTel | delivery_tel | 发货人电话 |
tbl_waybill | receiveCustomerName | receive_customer_name | 收货人 |
tbl_waybill | receiveAddr | receive_addr | 收货地址 |
tbl_waybill | receiveMobile | receive_mobile | 收货人手机 |
tbl_waybill | receiveTel | receive_tel | 收货人电话 |
tbl_waybill | cdt | cdt | 创建时间 |
tbl_waybill | udt | udt | 修改时间 |
tbl_waybill | remark | remark | 运单备注 |
tbl_transport_record | swId | sw_id | 发货仓库id |
tbl_warehouse | name | sw_name | 发货仓库名称 |
tbl_company | company_name | sw_company_name | 发货公司名称 |
tbl_transport_record | ewId | ew_id | 下一站仓库id |
tbl_warehouse | name | ew_name | 下一站仓库名称 |
tbl_company | companyName | ew_company_name | 下一站公司名称 |
tbl_transport_tool | id | tt_id | 运输工具id |
tbl_transport_tool | licensePlate | tt_name | 运输工具牌照 |
tbl_transport_record | routeId | route_id | 运输线路id |
tbl_route | start_station+end_station | route_name | 运输线路起点和终点组合 |
tbl_waybill | yyyyMMdd(cdt) | Day | 创建时间 年月日格式 |
2、SQL语句
SELECT
WAYBILL."id" ,
WAYBILL."express_bill_number",
WAYBILL."waybill_number" ,
WAYBILL."cid" ,
customer."name" AS cname,
customercodes."code" AS ctype,
customercodes."code_desc" AS ctype_name,
WAYBILL."eid" ,
courier."name" AS ename,
DOT ."id" AS dot_id,
DOT ."dot_name",
AREA."id" AS area_id,
AREA ."name" AS area_name,
WAYBILL."order_channel_id" ,
orderchannelcodes."code_desc" AS "order_channel_name",
WAYBILL."order_dt" ,
WAYBILL."order_terminal_type" ,
WAYBILL."order_terminal_os_type" ,
WAYBILL."reserve_dt" ,
WAYBILL."is_collect_package_timeout" ,
WAYBILL."pkg_id" ,
WAYBILL."pkg_number" ,
WAYBILL."timeout_dt" ,
WAYBILL."transform_type",
WAYBILL."delivery_customer_name" ,
WAYBILL."delivery_addr" ,
WAYBILL."delivery_mobile" ,
WAYBILL."delivery_tel" ,
WAYBILL."cdt" ,
WAYBILL."udt" ,
WAYBILL."remark" ,
record."sw_id",
swarehouse."name" AS sw_name,
scompany."company_name" AS sw_company_name,
record."ew_id",
ewarehouse."name" AS ew_name,
ecompany."company_name" AS ew_company_name,
tool."id" AS tt_id,
tool."license_plate" AS tt_name,
RECORD ."route_id",
concat(ROUTE ."start_station",ROUTE ."end_station") AS route_name
FROM "tbl_waybill" WAYBILL
LEFT JOIN "tbl_codes" orderchannelcodes ON orderchannelcodes."type" =18 AND WAYBILL."order_channel_id" =orderchannelcodes."code"
LEFT JOIN "tbl_customer" customer ON WAYBILL."cid" = CUSTOMER ."id"
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND customer."type" =customercodes."code"
LEFT JOIN "tbl_courier" courier ON WAYBILL."eid" = courier."id"
LEFT JOIN "tbl_dot" dot ON courier."dot_id" =dot."id"
LEFT JOIN "tbl_areas" area ON area."id" = DOT ."manage_area_id"
LEFT JOIN "tbl_transport_record" record ON RECORD ."pw_waybill_number" = WAYBILL."waybill_number"
LEFT JOIN "tbl_warehouse" swarehouse ON swarehouse."id" = record."sw_id"
LEFT JOIN "tbl_warehouse" ewarehouse ON ewarehouse."id" = record."ew_id"
LEFT JOIN "tbl_transport_tool" tool ON tool."id"=record."transport_tool_id"
LEFT JOIN "tbl_route" route ON record."route_id"=ROUTE ."id"
LEFT JOIN "tbl_company_warehouse_map" swarehouse_map ON record."sw_id"=swarehouse_map."warehouse_id"
LEFT JOIN "tbl_company" scompany ON scompany."id"=swarehouse_map."company_id"
LEFT JOIN "tbl_company_warehouse_map" ewarehouse_map ON record."ew_id"=ewarehouse_map."warehouse_id"
LEFT JOIN "tbl_company" ecompany ON ecompany."id"=ewarehouse_map."company_id"
ORDER BY WAYBILL."id" ASC
3、Spark实现
初始化环境变量
初始化运单明细拉宽作业的环境变量
package cn.it.logistics.offline.dwd
import cn.it.logistics.common.Configuration, SparkUtils
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object WayBillDWD extends OfflineApp
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit =
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit =
sparkSession.stop()
加载快递单相关的表数据并缓存
- 加载运单表的时候,需要指定日期条件,因为运单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
- 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//3.1:加载运单事实表的数据
val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
.persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失
//3.2:加载快递员表
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)
//3.3:加载网点表
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//3.4:加载区域表
val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)
//3.5:加载转运记录表
val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)
//3.6:加载起始仓库表
val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.7:加载到达仓库表
val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.8:加载车辆表
val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)
//3.9:加载线路表
val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)
//3.10:加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.11:加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.12:加载起始仓库所在公司表
val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.13:加载到达仓库所在公司表
val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.14:加载物流码表
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
//3.15:加载客户表
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
//导入隐士转换
import sparkSession.implicits._
//下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
$"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))
//客户类型表
val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
$"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))
定义表的关联关系
- 为了在DWS层任务中方便的获取每日增量运单数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd
代码如下:
//TODO 4)定义维度表与事实表的关联
val left_outer = "left_outer"
val wayBillDetailDF = wayBillDF
.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
.sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
.select(
wayBillDF("id"),//运单id
wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
wayBillDF("waybillNumber").as("waybill_number"),//运单编号
wayBillDF("cid"), //客户id
customerDF("name").as("cname"),//客户名称
customerDF("type").as("ctype"),//客户类型
customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
wayBillDF("eid"), //快递员id
courierDF("name").as("ename"),//快递员名称
dotDF("id").as("dot_id"),//网点id
dotDF("dotName").as("dot_name"),//网点名称
areasDF("id").as("area_id"),//区域id
areasDF("name").as("area_name"),//区域名称
wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
wayBillDF("orderDt").as("order_dt"),//下单时间
wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
wayBillDF("pkgId").as("pkg_id"),//订装ID
wayBillDF("pkgNumber").as("pkg_number"),//订装编号
wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
wayBillDF("transformType").as("transform_type"),//运输方式
wayBillDF("deliveryAddr").as("delivery_addr"),
wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
wayBillDF("deliveryMobile").as("delivery_mobile"),
wayBillDF("deliveryTel").as("delivery_tel"),
wayBillDF("receiveAddr").as("receive_addr"),
wayBillDF("receiveCustomerName").as("receive_customer_name"),
wayBillDF("receiveMobile").as("receive_mobile"),
wayBillDF("receiveTel").as("receive_tel"),
wayBillDF("cdt"),
wayBillDF("udt"),
wayBillDF("remark"),
recordDF("swId").as("sw_id"),
startWarehouseDF("name").as("sw_name"),
startCompanyDF("id").as("sw_company_id"),
startCompanyDF("companyName").as("sw_company_name"),
recordDF("ewId").as("ew_id"),
endWarehouseDF("name").as("ew_name"),
endCompanyDF("id").as("ew_company_id"),
endCompanyDF("companyName").as("ew_company_name"),
toolDF("id").as("tt_id"),
toolDF("licensePlate").as("tt_name"),
recordDF("routeId").as("route_id"),
functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
$"day"
)
创建运单明细宽表并将运单明细数据写入到kudu表中
运单宽表数据需要保存到kudu中,因此在第一次执行快递单明细拉宽操作时,运单明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
实现步骤:
- 在WaybillDWD 单例对象中调用save方法
实现过程:
- 在WaybillDWD 单例对象Main方法中调用save方法
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)
删除缓存数据
为了释放资源,运单明细宽表数据计算完成以后,需要将缓存的源表数据删除。
//TODO 6):将缓存的数据删除掉
wayBillDF.unpersist()
courierDF.unpersist()
dotDF.unpersist()
areasDF.unpersist()
recordDF.unpersist()
startWarehouseDF.unpersist()
endWarehouseDF.unpersist()
toolDF.unpersist()
routeDF.unpersist()
startCompanyWarehouseDF.unpersist()
startCompanyDF.unpersist()
endCompanyWarehouseDF.unpersist()
endCompanyDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
customerTypeDF.unpersist()
完整代码
package cn.it.logistics.offline.dwd
import cn.it.logistics.common.CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.DataFrame, SparkSession, functions
import org.apache.spark.storage.StorageLevel
object WayBillDWD extends OfflineApp
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit =
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取运单明细宽表的数据
* 4)对运单明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit =
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//3.1:加载运单事实表的数据
val wayBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.waybill, Configuration.isFirstRunnable)
.persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失
//3.2:加载快递员表
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)
//3.3:加载网点表
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//3.4:加载区域表
val areasDF: DataFrame = getKuduSource(sparkSession, TableMapping.areas, true).persist(StorageLevel.DISK_ONLY_2)
//3.5:加载转运记录表
val recordDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportRecord, true).persist(StorageLevel.DISK_ONLY_2)
//3.6:加载起始仓库表
val startWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.7:加载到达仓库表
val endWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//3.8:加载车辆表
val toolDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, true).persist(StorageLevel.DISK_ONLY_2)
//3.9:加载线路表
val routeDF: DataFrame = getKuduSource(sparkSession, TableMapping.route, true).persist(StorageLevel.DISK_ONLY_2)
//3.10:加载起始仓库关联表
val startCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.11:加载到达仓库关联表
val endCompanyWarehouseDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//3.12:加载起始仓库所在公司表
val startCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.13:加载到达仓库所在公司表
val endCompanyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//3.14:加载物流码表
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
//3.15:加载客户表
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
//导入隐士转换
import sparkSession.implicits._
//下单渠道类型表
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType).select(
$"code".as("orderChannelTypeCode"), $"codeDesc".as("orderChannelTypeName"))
//客户类型表
val customerTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.CustomType).select(
$"code".as("customerTypeCode"), $"codeDesc".as("customerTypeName"))
//TODO 4)定义维度表与事实表的关联
val left_outer = "left_outer"
val wayBillDetailDF = wayBillDF
.join(courierDF, wayBillDF("eid") === courierDF("id"), left_outer) //运单表与快递员表进行关联
.join(dotDF, courierDF("dotId") === dotDF("id"), left_outer) //网点表与快递员表进行关联
.join(areasDF, areasDF("id") === dotDF("manageAreaId"), left_outer) //网点表与区域表进行关联
.join(recordDF, recordDF("pwWaybillNumber") === wayBillDF("waybillNumber"), left_outer) //转运记录表与运单表关联
.join(startWarehouseDF, startWarehouseDF("id") === recordDF("swId"), left_outer) //起始仓库与转运记录表关联
.join(endWarehouseDF, endWarehouseDF("id") === recordDF("ewId"), left_outer) //到达仓库与转运记录表关联
.join(toolDF, toolDF("id") === recordDF("transportToolId"), left_outer) //转运记录表与交通工具表关联
.join(routeDF, routeDF("id") === recordDF("routeId"), left_outer) //转运记录表与路线表关联
.join(startCompanyWarehouseDF, startCompanyWarehouseDF("warehouseId") === startWarehouseDF("id"), left_outer) //起始仓库表与仓库公司关联表关联
.join(startCompanyDF, startCompanyDF("id") === startCompanyWarehouseDF("companyId"), left_outer) //公司表与起始仓库公司关联表关联
.join(endCompanyWarehouseDF, endCompanyWarehouseDF("warehouseId") === endWarehouseDF("id"), left_outer) //到达仓库表与仓库公司关联表关联
.join(endCompanyDF, endCompanyDF("id") === endCompanyWarehouseDF("companyId"), left_outer) //公司表与到达仓库公司关联表关联
.join(customerDF, customerDF("id") === wayBillDF("cid"), left_outer) //运单表与客户表关联
.join(orderChannelTypeDF, orderChannelTypeDF("orderChannelTypeCode") === wayBillDF("orderChannelId"), left_outer) //下单渠道表与运单表关联
.join(customerTypeDF, customerTypeDF("customerTypeCode") === customerDF("type"), left_outer) //客户类型表与客户表关联
.withColumn("day", date_format(wayBillDF("cdt"), "yyyyMMdd")) //增加日期列
.sort(wayBillDF.col("cdt").asc) //根据运单表的创建时间顺序排序
.select(
wayBillDF("id"),//运单id
wayBillDF("expressBillNumber").as("express_bill_number"), //快递单编号
wayBillDF("waybillNumber").as("waybill_number"),//运单编号
wayBillDF("cid"), //客户id
customerDF("name").as("cname"),//客户名称
customerDF("type").as("ctype"),//客户类型
customerTypeDF("customerTypeName").as("ctype_name"),//客户类型名称
wayBillDF("eid"), //快递员id
courierDF("name").as("ename"),//快递员名称
dotDF("id").as("dot_id"),//网点id
dotDF("dotName").as("dot_name"),//网点名称
areasDF("id").as("area_id"),//区域id
areasDF("name").as("area_name"),//区域名称
wayBillDF("orderChannelId").as("order_channel_id"),//渠道id
orderChannelTypeDF("orderChannelTypeName").as("order_chanel_name"),//渠道名称
wayBillDF("orderDt").as("order_dt"),//下单时间
wayBillDF("orderTerminalType").as("order_terminal_type"),//下单设备类型
wayBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统类型
wayBillDF("reserveDt").as("reserve_dt"),//预约取件时间
wayBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
wayBillDF("pkgId").as("pkg_id"),//订装ID
wayBillDF("pkgNumber").as("pkg_number"),//订装编号
wayBillDF("timeoutDt").as("timeout_dt"),//超时时间
wayBillDF("transformType").as("transform_type"),//运输方式
wayBillDF("deliveryAddr").as("delivery_addr"),
wayBillDF("deliveryCustomerName").as("delivery_customer_name"),
wayBillDF("deliveryMobile").as("delivery_mobile"),
wayBillDF("deliveryTel").as("delivery_tel"),
wayBillDF("receiveAddr").as("receive_addr"),
wayBillDF("receiveCustomerName").as("receive_customer_name"),
wayBillDF("receiveMobile").as("receive_mobile"),
wayBillDF("receiveTel").as("receive_tel"),
wayBillDF("cdt"),
wayBillDF("udt"),
wayBillDF("remark"),
recordDF("swId").as("sw_id"),
startWarehouseDF("name").as("sw_name"),
startCompanyDF("id").as("sw_company_id"),
startCompanyDF("companyName").as("sw_company_name"),
recordDF("ewId").as("ew_id"),
endWarehouseDF("name").as("ew_name"),
endCompanyDF("id").as("ew_company_id"),
endCompanyDF("companyName").as("ew_company_name"),
toolDF("id").as("tt_id"),
toolDF("licensePlate").as("tt_name"),
recordDF("routeId").as("route_id"),
functions.concat(routeDF("startStation"), routeDF("endStation")).as("route_name"),
$"day"
)
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(wayBillDetailDF, OfflineTableDefine.wayBillDetail)
//TODO 6):将缓存的数据删除掉
wayBillDF.unpersist()
courierDF.unpersist()
dotDF.unpersist()
areasDF.unpersist()
recordDF.unpersist()
startWarehouseDF.unpersist()
endWarehouseDF.unpersist()
toolDF.unpersist()
routeDF.unpersist()
startCompanyWarehouseDF.unpersist()
startCompanyDF.unpersist()
endCompanyWarehouseDF.unpersist()
endCompanyDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
customerTypeDF.unpersist()
sparkSession.stop()
4、测试验证
启动WaybillDWD
五、运单数据指标计算开发
1、计算的字段
字段名 | 字段描述 |
id | 数据产生时间 |
totalWaybillCount | 总运单数 |
maxAreaTotalCount | 最大区域运单数 |
minAreaTotalCount | 最小区域运单数 |
avgAreaTotalCount | 各区域平均运单数 |
maxCompanyTotalCount | 各分公司最大运单数 |
minCompanyTotalCount | 各分公司最小运单数 |
avgCompanyTotalCount | 各分公司平均运单数 |
maxDotTotalCount | 各网点最大运单数 |
minDotTotalCount | 各网点最小运单数 |
avgDotTotalCount | 各网点平均运单数 |
maxRouteTotalCount | 各线路最大运单数 |
minRouteTotalCount | 各线路最小运单数 |
avgRouteTotalCount | 各线路平均运单数 |
maxTransportToolTotalCount | 各运输工具最大运单数 |
minTransportToolTotalCount | 各运输工具最小运单数 |
avgTransportToolTotalCount | |