客快物流大数据项目(一百零二):业务和指数开发

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(一百零二):业务和指数开发相关的知识,希望对你有一定的参考价值。

文章目录

业务和指数开发

一、业务开发

二、​​​​​​​​​​​​​​指标开发

1、​​​​​​​​​​​​​​总网点数

2、​​​​​​​各省份网点数

3、​​​​​​​​​​​​​​各省份收件总单数

4、各地区收件总单数

5、各省份派件总单数

6、​​​​​​​​​​​​​​各地区派件总单数

7、各省份快递员总数量

8、各地区快递员总数量

9、​​​​​​​​​​​​​​各省份三轮车数量

10、​​​​​​​​​​​​​​各地区三轮车数量

11、​​​​​​​​​​​​​​快递保价总单数

12、​​​​​​​​​​​​​​快递未保价总单数

13、当天全部快递单数

14、当天全部签收单数

15、​​​​​​​​​​​​​​当天未签收单数

16、​​​​​​​​​​​​​​当天拒收总单数

17、​​​​​​​​​​​​​​运单总数量

18、​​​​​​​​​​​​​​各省份运单数量

19、​​​​​​​​​​​​​​各地区运单数量


业务和指数开发

一、​​​​​​​业务开发

实现步骤:

  • logistics-etl模块cn.it.logistics.etl.realtime程序包下创建CKStreamApp单例对象,继承自StreamApp
  • 编写main入口函数,初始化spark的运行环境
  • 实现StreamApp基类的两个方法
    • Execute(消费kafka数据,并对消费到的数据转换成对象,过滤每张表的数据写入到CK)
    • Save(实现数据写入到ClickHouse中)

实现方法:

  • logistics-etl模块cn.it.logistics.etl.realtime程序包下创建CKStreamApp单例对象,继承自StreamApp
package cn.it.logistics.etl.realtime

/**
 * CK数据管道应用
 */
object CKStreamApp extends StreamApp 
  • 编写main入口函数,初始化spark的运行环境
def main(args: Array[String]): Unit = 
  // 获取SparkConf对象
  val conf = SparkUtils.autoSettingEnv(
    SparkUtils.sparkConf(appName).registerKryoClasses(Array(classOf[ru.yandex.clickhouse.ClickHouseConnectionImpl])),
    SparkUtils.parameterParser(args)
  )
  // 运行管道应用
  execute(conf)
  • 实现StreamApp基类的两个方法
    • Execute
/**
 * 运行
 * @param conf
 */
override def execute(conf: SparkConf): Unit = 
  // 创建SparkSession实例
  val spark = SparkSession.builder().config(conf).getOrCreate()
  spark.sparkContext.setLogLevel(Configure.LOG_OFF)
  // 订阅Kafka中logistics主题的数据(对应Oracle业务库的tbl_logistics表空间下所有表,Ogg采集到Kafka)
  val logisticsRawDF = getKafkaSource(spark, Configure.kafkaAddress, "logistics")
  // 转换物流系统中所有表数据为OggMessageBean类型的DataFrame
  val logisticsDF = logisticsRawDF.filter(!_.isNullAt(0)).mapPartitions(iters => 
    iters.map(row => 
      val jsonStr: String = row.getAs(0)
      val bean = JSON.parseObject(jsonStr, classOf[OggMessageBean])
      if(null!=bean)  bean.setTable(bean.getTable.replaceAll("[A-Z]+\\\\.","")) 
      bean
    ).toList.iterator
  )(Encoders.bean(classOf[OggMessageBean]))

  // 订阅Kafka中crm主题的数据(对应mysql的crm庫下所有表,Canal采集到Kafka)
  val crmRawDF = getKafkaSource(spark, Configure.kafkaAddress, "crm")
  // 转换CRM系统中所有表数据为CanalMessageBean类型的DataFrame
  val crmDF = crmRawDF.filter(!_.isNullAt(0)).mapPartitions(iters => 
    iters.filter(row=>
      val line = row.getAs[String](0)
      if(line.contains("TRUNCATE")||line.contains("truncate")) false else true
    ).map(row => 
      val jsonStr: String = row.getAs(0)
      JSON.parseObject(jsonStr, classOf[CanalMessageBean])
    ).toList.iterator
  )(Encoders.bean(classOf[CanalMessageBean]))

  // 导入自定义POJO的隐式转换
  import  cn.it.logistics.etl.utils.BeanImplicit._
  // 转换Ogg和Canal对应主题的数据为具体的POJO对象
  val areasDF = logisticsDF.filter(bean => bean.getTable == TableMapping.areas).map(bean => DataParser.toAreas(bean))(AreasBeanEncoder).toDF()
  val codesDF = logisticsDF.filter(bean => bean.getTable == TableMapping.codes).map(bean => DataParser.toCodes(bean))(CodesBeanEncoder).toDF()
  val collectPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.collectPackage).map(bean => DataParser.toCollectPackage(bean))(CollectPackageBeanEncoder).toDF()
  val consumerSenderInfoDF = logisticsDF.filter(bean => bean.getTable == TableMapping.consumerSenderInfo).map(bean => DataParser.toConsumerSenderInfo(bean))(ConsumerSenderInfoBeanEncoder).toDF()
  val courierDF = logisticsDF.filter(bean => bean.getTable == TableMapping.courier).map(bean => DataParser.toCourier(bean))(CourierBeanEncoder).toDF()
  val deliverPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.deliverPackage).map(bean => DataParser.toDeliverPackage(bean))(DeliverPackageBeanEncoder).toDF()
  val dotDF = logisticsDF.filter(bean => bean.getTable == TableMapping.dot).map(bean => DataParser.toDot(bean))(DotBeanEncoder).toDF()
  val dotTransportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.dotTransportTool).map(bean => DataParser.toDotTransportTool(bean))(DotTransportToolBeanEncoder).toDF()
  val expressBillDF = logisticsDF.filter(bean => bean.getTable == TableMapping.expressBill).map(bean => DataParser.toExpressBill(bean))(ExpressBillBeanEncoder).toDF()
  val expressPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.expressPackage).map(bean => DataParser.toExpressPackage(bean))(ExpressPackageBeanEncoder).toDF()
  val outWarehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.outWarehouse).map(bean => DataParser.toOutWarehouse(bean))(OutWarehouseBeanEncoder).toDF()
  val pkgDF = logisticsDF.filter(bean => bean.getTable == TableMapping.pkg).map(bean => DataParser.toPkg(bean))(PkgBeanEncoder).toDF()
  val pushWarehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.pushWarehouse).map(bean => DataParser.toPushWarehouse(bean))(PushWarehouseBeanEncoder).toDF()
  val routeDF = logisticsDF.filter(bean => bean.getTable == TableMapping.route).map(bean => DataParser.toRoute(bean))(RouteBeanEncoder).toDF()
  val transportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.transportTool).map(bean => DataParser.toTransportTool(bean))(TransportToolBeanEncoder).toDF()
  val warehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouse).map(bean => DataParser.toWarehouse(bean))(WarehouseBeanEncoder).toDF()
  val warehouseEmpDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseEmp).map(bean => DataParser.toWarehouseEmp(bean))(WarehouseEmpBeanEncoder).toDF()
  val warehouseReceiptDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseReceipt).map(bean => DataParser.toWarehouseReceipt(bean))(WarehouseReceiptBeanEncoder).toDF()
  val warehouseTransportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseTransportTool).map(bean => DataParser.toWarehouseTransportTool(bean))(WarehouseTransportToolBeanEncoder).toDF()
  val warehouseVehicleMapDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseVehicleMap).map(bean => DataParser.toWarehouseVehicleMap(bean))(WarehouseVehicleMapBeanEncoder).toDF()
  val waybillDF = logisticsDF.filter(bean => bean.getTable == TableMapping.waybill).map(bean => DataParser.toWaybill(bean))(WaybillBeanEncoder).toDF()
  val transportRecordDF = logisticsDF.filter(bean => bean.getTable == TableMapping.transportRecord).map(bean => DataParser.toTransportRecordBean(bean))(TransportRecordBeanEncoder).toDF()
  val addressDF = crmDF.filter(bean => bean.getTable == TableMapping.address).map(bean => DataParser.toAddress(bean))(AddressBeanEncoder).toDF()
  val customerDF = crmDF.filter(bean => bean.getTable == TableMapping.customer).map(bean => DataParser.toCustomer(bean))(CustomerBeanEncoder).toDF()
  val consumerAddressMapDF = crmDF.filter(bean => bean.getTable == TableMapping.consumerAddressMap).map(bean => DataParser.toCustomerAddress(bean))(CustomerAddressBeanEncoder).toDF()
  // 保存到ClickHouse
  save(areasDF, TableMapping.areas)
  save(codesDF, TableMapping.codes)
  save(collectPackageDF, TableMapping.collectPackage)
  save(consumerSenderInfoDF, TableMapping.consumerSenderInfo)
  save(courierDF, TableMapping.courier)
  save(deliverPackageDF, TableMapping.deliverPackage)
  save(dotDF, TableMapping.dot)
  save(dotTransportToolDF, TableMapping.dotTransportTool)
  save(expressBillDF, TableMapping.expressBill)
  save(expressPackageDF, TableMapping.expressPackage)
  save(outWarehouseDF, TableMapping.outWarehouse)
  save(pkgDF, TableMapping.pkg)
  save(pushWarehouseDF, TableMapping.pushWarehouse)
  save(routeDF, TableMapping.route)
  save(transportRecordDF, TableMapping.transportRecord)
  save(transportToolDF, TableMapping.transportTool)
  save(warehouseDF, TableMapping.warehouse)
  save(warehouseEmpDF, TableMapping.warehouseEmp)
  save(warehouseReceiptDF, TableMapping.warehouseReceipt)
  save(warehouseTransportToolDF, TableMapping.warehouseTransportTool)
  save(warehouseVehicleMapDF, TableMapping.warehouseVehicleMap)
  save(waybillDF, TableMapping.waybill)
  save(customerDF, TableMapping.customer)
  save(addressDF, TableMapping.address)
  save(consumerAddressMapDF, TableMapping.consumerAddressMap)
  // 提交运行
  val streams = spark.streams
  streams.active.foreach(q=>println(s"==== 准备启动的查询:$q.name"))
  streams.awaitAnyTermination()
  •  save
/**
 * 持久化数据到CK表
 * @param df  数据
 * @param table 要写入的CK表
 * @param isAutoCreateTable 如果Kudu表不存在时,是否自动创建表,默认true
 */
override def save(df: DataFrame, table:String, isAutoCreateTable: Boolean = true): Unit = 
  val options = Map(
    "driver" -> Configure.clickhouseDriver,
    "url" -> Configure.clickhouseUrl,
    "user" -> Configure.clickhouseUser,
    "password" -> Configure.clickhousePassword,
    "table" -> table,
    "autoCreateTable" -> isAutoCreateTable.toString,
    "primaryKey" -> "id",
    "opTypeField"->"opType"
  )
  df.writeStream
    .format(Configure.SPARK_CLICKHOUSE_FORMAT)
    .options(options)
    .outputMode(OutputMode.Append)
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .queryName(table+"-"+Configure.SPARK_CLICKHOUSE_FORMAT)
    .start()

完整代码:

package cn.it.logistics.etl.realtime

import cn.it.logistics.common.beans.parser.CanalMessageBean, OggMessageBean
import cn.it.logistics.etl.parser.DataParser
import cn.it.logistics.etl.utils.Configure, SparkUtils, TableMapping, Tools
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode, Trigger
import org.apache.spark.sql.DataFrame, Encoders, SparkSession

/**
 * CK数据管道应用
 */
object CKStreamApp extends StreamApp 

  val appName: String = this.getClass.getSimpleName

  def main(args: Array[String]): Unit = 
    // 获取SparkConf对象
    val conf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName).registerKryoClasses(Array(classOf[ru.yandex.clickhouse.ClickHouseConnectionImpl])),
      SparkUtils.parameterParser(args)
    )
    // 运行管道应用
    execute(conf)
  

  /**
   * 运行
   * @param conf
   */
  override def execute(conf: SparkConf): Unit = 
    // 创建SparkSession实例
    val spark = SparkSession.builder().config(conf).getOrCreate()
    spark.sparkContext.setLogLevel(Configure.LOG_OFF)
    // 订阅Kafka中logistics主题的数据(对应Oracle业务库的tbl_logistics表空间下所有表,Ogg采集到Kafka)
    val logisticsRawDF = getKafkaSource(spark, Configure.kafkaAddress, "logistics")
    // 转换物流系统中所有表数据为OggMessageBean类型的DataFrame
    val logisticsDF = logisticsRawDF.filter(!_.isNullAt(0)).mapPartitions(iters => 
      iters.map(row => 
        val jsonStr: String = row.getAs(0)
        val bean = JSON.parseObject(jsonStr, classOf[OggMessageBean])
        if(null!=bean)  bean.setTable(bean.getTable.replaceAll("[A-Z]+\\\\.","")) 
        bean
      ).toList.iterator
    )(Encoders.bean(classOf[OggMessageBean]))

    // 订阅Kafka中crm主题的数据(对应MySQL的crm庫下所有表,Canal采集到Kafka)
    val crmRawDF = getKafkaSource(spark, Configure.kafkaAddress, "crm")
    // 转换CRM系统中所有表数据为CanalMessageBean类型的DataFrame
    val crmDF = crmRawDF.filter(!_.isNullAt(0)).mapPartitions(iters => 
      iters.filter(row=>
        val line = row.getAs[String](0)
        if(line.contains("TRUNCATE")||line.contains("truncate")) false else true
      ).map(row => 
        val jsonStr: String = row.getAs(0)
        JSON.parseObject(jsonStr, classOf[CanalMessageBean])
      ).toList.iterator
    )(Encoders.bean(classOf[CanalMessageBean]))

    // 导入自定义POJO的隐式转换
    import  cn.it.logistics.etl.utils.BeanImplicit._
    // 转换Ogg和Canal对应主题的数据为具体的POJO对象
    val areasDF = logisticsDF.filter(bean => bean.getTable == TableMapping.areas).map(bean => DataParser.toAreas(bean))(AreasBeanEncoder).toDF()
    val codesDF = logisticsDF.filter(bean => bean.getTable == TableMapping.codes).map(bean => DataParser.toCodes(bean))(CodesBeanEncoder).toDF()
    val collectPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.collectPackage).map(bean => DataParser.toCollectPackage(bean))(CollectPackageBeanEncoder).toDF()
    val consumerSenderInfoDF = logisticsDF.filter(bean => bean.getTable == TableMapping.consumerSenderInfo).map(bean => DataParser.toConsumerSenderInfo(bean))(ConsumerSenderInfoBeanEncoder).toDF()
    val courierDF = logisticsDF.filter(bean => bean.getTable == TableMapping.courier).map(bean => DataParser.toCourier(bean))(CourierBeanEncoder).toDF()
    val deliverPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.deliverPackage).map(bean => DataParser.toDeliverPackage(bean))(DeliverPackageBeanEncoder).toDF()
    val dotDF = logisticsDF.filter(bean => bean.getTable == TableMapping.dot).map(bean => DataParser.toDot(bean))(DotBeanEncoder).toDF()
    val dotTransportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.dotTransportTool).map(bean => DataParser.toDotTransportTool(bean))(DotTransportToolBeanEncoder).toDF()
    val expressBillDF = logisticsDF.filter(bean => bean.getTable == TableMapping.expressBill).map(bean => DataParser.toExpressBill(bean))(ExpressBillBeanEncoder).toDF()
    val expressPackageDF = logisticsDF.filter(bean => bean.getTable == TableMapping.expressPackage).map(bean => DataParser.toExpressPackage(bean))(ExpressPackageBeanEncoder).toDF()
    val outWarehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.outWarehouse).map(bean => DataParser.toOutWarehouse(bean))(OutWarehouseBeanEncoder).toDF()
    val pkgDF = logisticsDF.filter(bean => bean.getTable == TableMapping.pkg).map(bean => DataParser.toPkg(bean))(PkgBeanEncoder).toDF()
    val pushWarehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.pushWarehouse).map(bean => DataParser.toPushWarehouse(bean))(PushWarehouseBeanEncoder).toDF()
    val routeDF = logisticsDF.filter(bean => bean.getTable == TableMapping.route).map(bean => DataParser.toRoute(bean))(RouteBeanEncoder).toDF()
    val transportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.transportTool).map(bean => DataParser.toTransportTool(bean))(TransportToolBeanEncoder).toDF()
    val warehouseDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouse).map(bean => DataParser.toWarehouse(bean))(WarehouseBeanEncoder).toDF()
    val warehouseEmpDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseEmp).map(bean => DataParser.toWarehouseEmp(bean))(WarehouseEmpBeanEncoder).toDF()
    val warehouseReceiptDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseReceipt).map(bean => DataParser.toWarehouseReceipt(bean))(WarehouseReceiptBeanEncoder).toDF()
    val warehouseTransportToolDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseTransportTool).map(bean => DataParser.toWarehouseTransportTool(bean))(WarehouseTransportToolBeanEncoder).toDF()
    val warehouseVehicleMapDF = logisticsDF.filter(bean => bean.getTable == TableMapping.warehouseVehicleMap).map(bean => DataParser.toWarehouseVehicleMap(bean))(WarehouseVehicleMapBeanEncoder).toDF()
    val waybillDF = logisticsDF.filter(bean => bean.getTable == TableMapping.waybill).map(bean => DataParser.toWaybill(bean))(WaybillBeanEncoder).toDF()
    val transportRecordDF = logisticsDF.filter(bean => bean.getTable == TableMapping.transportRecord).map(bean => DataParser.toTransportRecordBean(bean))(TransportRecordBeanEncoder).toDF()
    val addressDF = crmDF.filter(bean => bean.getTable == TableMapping.address).map(bean => DataParser.toAddress(bean))(AddressBeanEncoder).toDF()
    val customerDF = crmDF.filter(bean => bean.getTable == TableMapping.customer).map(bean => DataParser.toCustomer(bean))(CustomerBeanEncoder).toDF()
    val consumerAddressMapDF = crmDF.filter(bean => bean.getTable == TableMapping.consumerAddressMap).map(bean => DataParser.toCustomerAddress(bean))(CustomerAddressBeanEncoder).toDF()
    // 保存到ClickHouse
    save(areasDF, TableMapping.areas)
    save(codesDF, TableMapping.codes)
    save(collectPackageDF, TableMapping.collectPackage)
    save(consumerSenderInfoDF, TableMapping.consumerSenderInfo)
    save(courierDF, TableMapping.courier)
    save(deliverPackageDF, TableMapping.deliverPackage)
    save(dotDF, TableMapping.dot)
    save(dotTransportToolDF, TableMapping.dotTransportTool)
    save(expressBillDF, TableMapping.expressBill)
    save(expressPackageDF, TableMapping.expressPackage)
    save(outWarehouseDF, TableMapping.outWarehouse)
    save(pkgDF, TableMapping.pkg)
    save(pushWarehouseDF, TableMapping.pushWarehouse)
    save(routeDF, TableMapping.route)
    save(transportRecordDF, TableMapping.transportRecord)
    save(transportToolDF, TableMapping.transportTool)
    save(warehouseDF, TableMapping.warehouse)
    save(warehouseEmpDF, TableMapping.warehouseEmp)
    save(warehouseReceiptDF, TableMapping.warehouseReceipt)
    save(warehouseTransportToolDF, TableMapping.warehouseTransportTool)
    save(warehouseVehicleMapDF, TableMapping.warehouseVehicleMap)
    save(waybillDF, TableMapping.waybill)
    save(customerDF, TableMapping.customer)
    save(addressDF, TableMapping.address)
    save(consumerAddressMapDF, TableMapping.consumerAddressMap)
    // 提交运行
    val streams = spark.streams
    streams.active.foreach(q=>println(s"==== 准备启动的查询:$q.name"))
    streams.awaitAnyTermination()
  

  /**
   * 持久化数据到CK表
   * @param df  数据
   * @param table 要写入的CK表
   * @param isAutoCreateTable 如果Kudu表不存在时,是否自动创建表,默认true
   */
  override def save(df: DataFrame, table:String, isAutoCreateTable: Boolean = true): Unit = 
    val options = Map(
      "driver" -> Configure.clickhouseDriver,
      "url" -> Configure.clickhouseUrl,
      "user" -> Configure.clickhouseUser,
      "password" -> Configure.clickhousePassword,
      "table" -> table,
      "autoCreateTable" -> isAutoCreateTable.toString,
      "primaryKey" -> "id",
      "opTypeField"->"opType"
    )
    df.writeStream
      .format(Configure.SPARK_CLICKHOUSE_FORMAT)
      .options(options)
      .outputMode(OutputMode.Append)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .queryName(table+"-"+Configure.SPARK_CLICKHOUSE_FORMAT)
      .start()
  

二、​​​​​​​​​​​​​​指标开发

1、​​​​​​​​​​​​​​总网点数

SELECT
  COUNT(DISTINCT id) "cnt"
FROM
  "tbl_dot";

2、​​​​​​​各省份网点数

SELECT
  ta."name",
  COUNT(td."manageAreaId") "cnt"
FROM
  "tbl_dot" td
  LEFT JOIN "tbl_areas" ta
    ON (
      ta."id" = CAST(td."manageAreaId" AS Int64)
    )
GROUP BY td."manageAreaId",
  ta."name"
ORDER BY "cnt" DESC;

3、​​​​​​​​​​​​​​各省份收件总单数

SELECT
    tc0.pid,
    ta1.name,
    COUNT(tc0.pid),
    SUM(tc0.cnt)
FROM
(
    SELECT
        ta.pid AS pid,
        ta.id AS id,
        ta.name AS NAME,
        COUNT(tcp.pkgId) AS cnt
    FROM tbl_collect_package AS tcp
    LEFT JOIN tbl_courier AS tc ON tc.id = tcp.eid
    LEFT JOIN tbl_dot AS td ON tc.dotId = td.id
    LEFT JOIN tbl_areas AS ta ON ta.id = CAST(td.manageAreaId, 'Int64')
    GROUP BY
        ta.id,
        ta.pid,
        ta.name
) AS tc0
LEFT JOIN tbl_areas AS ta1 ON ta1.id = tc0.pid
GROUP BY tc0.pid,ta1.name;

4、各地区收件总单数

SELECT
    ta.id,
    ta.name,
    COUNT(tcp.pkgId)
FROM tbl_collect_package AS tcp
LEFT JOIN tbl_courier AS tc ON tc.id = tcp.eid
LEFT JOIN tbl_dot AS td ON tc.dotId = td.id
LEFT JOIN tbl_areas AS ta ON ta.id = CAST(td.manageAreaId, 'Int64')
GROUP BY ta.id,ta.name

5、各省份派件总单数

SELECT
  t2."pid",
  t2."name",
  SUM(t1."cnt")
FROM
  (SELECT
    ta."id" AS id,
    ta."pid" AS pid,
    COUNT(tdp."expressBillId") "cnt"
  FROM
    "tbl_deliver_package" tdp
    LEFT JOIN "tbl_courier" tc
      ON tdp."empId" = tc."id"
    LEFT JOIN "tbl_dot" td
      ON td."id" = tc."dotId"
    LEFT JOIN "tbl_areas" ta
      ON ta."id" = CAST(td."manageAreaId",'Int64')
  GROUP BY ta."id",
    ta."pid") t1
  LEFT JOIN "tbl_areas" t2
    ON t2."id" = t1."pid"
GROUP BY t2."pid", t2."name";

6、​​​​​​​​​​​​​​各地区派件总单数

SELECT
  ta."id",
  ta."pid",
  COUNT(tdp."expressBillId")
FROM
  "tbl_deliver_package" tdp
  LEFT JOIN "tbl_courier" tc
    ON tdp."empId" = tc."id"
  LEFT JOIN "tbl_dot" td
    ON td."id" = tc."dotId"
  LEFT JOIN "tbl_areas" ta
    ON ta."id" = CAST(td."manageAreaId",'Int64')
GROUP BY ta."id",ta."pid";

7、各省份快递员总数量

SELECT
  t2."pid",
  t2."name",
  SUM(t1."cnt")
FROM
  (SELECT
    ta."id" AS id,
    ta."pid" AS pid,
    COUNT(td."id") "cnt"
  FROM
    "tbl_courier" tc
    LEFT JOIN "tbl_dot" td
      ON td."id" = tc."dotId"
    LEFT JOIN "tbl_areas" ta
      ON CAST(td."manageAreaId",'Int64') = ta."id"
  GROUP BY td."id",
    ta."id",
    ta."pid") t1
  LEFT JOIN "tbl_areas" t2
    ON t2."id" = t1."pid"
GROUP BY t2."pid",t2."name";

8、各地区快递员总数量

SELECT
  ta."id",
  ta."pid",
  COUNT(td."id") "cnt"
FROM
  "tbl_courier" tc
  LEFT JOIN "tbl_dot" td
    ON td."id" = tc."dotId"
  LEFT JOIN "tbl_areas" ta
    ON CAST(td."manageAreaId",'Int64') = ta."id"
GROUP BY td."id", ta."id", ta."pid";

9、​​​​​​​​​​​​​​各省份三轮车数量

SELECT
  t2."pid",
  t2."name",
  SUM(t1."cnt")
FROM
  (SELECT
    ta."id" AS id,
    ta."pid" AS pid,
    td."dotName" AS dot_name,
    COUNT(tdtt."transportToolId") "cnt"
  FROM
    "tbl_dot_transport_tool" tdtt
    LEFT JOIN "tbl_dot" td
      ON td."id" = tdtt."dotId"
    LEFT JOIN "tbl_transport_tool" ttt
      ON ttt."id" = tdtt."transportToolId"
    LEFT JOIN "tbl_areas" ta
      ON ta."id" = CAST(td."manageAreaId",'Int64')
  GROUP BY ta."id",
    ta."pid",
    td."dotName") t1
  LEFT JOIN "tbl_areas" t2
    ON t2."id" = t1."pid"
GROUP BY t2."pid",t2."name";

10、​​​​​​​​​​​​​​各地区三轮车数量

SELECT
  ta."id",
  ta."pid",
  td."dotName",
  COUNT(tdtt."transportToolId") "cnt"
FROM
  "tbl_dot_transport_tool" tdtt
  LEFT JOIN "tbl_dot" td
    ON td."id" = tdtt."dotId"
  LEFT JOIN "tbl_transport_tool" ttt
    ON ttt."id" = tdtt."transportToolId"
  LEFT JOIN "tbl_areas" ta
    ON ta."id" = CAST(td."manageAreaId",'Int64')
GROUP BY ta."id", ta."pid", td."dotName";

11、​​​​​​​​​​​​​​快递保价总单数

SELECT
  COUNT(tep."id") "cnt"
FROM
  "tbl_collect_package" tcp
  LEFT JOIN "tbl_express_package" tep
    ON tep."id" = tcp."pkgId"
WHERE tep."insuredPrice" > 0;

12、​​​​​​​​​​​​​​快递未保价总单数

SELECT
  COUNT(tep."id") "cnt"
FROM
  "tbl_collect_package" tcp
  LEFT JOIN "tbl_express_package" tep
    ON tep."id" = tcp."pkgId"
WHERE tep."insuredPrice" = 0;

13、当天全部快递单数

SELECT COUNT(1) "cnt" FROM "tbl_collect_package" tcp;

14、当天全部签收单数

SELECT COUNT(1) "cnt" FROM "tbl_deliver_package" tdp WHERE "receType"=3;

15、​​​​​​​​​​​​​​当天未签收单数

SELECT COUNT(1) "cnt" FROM "tbl_deliver_package" tdp WHERE "receType"=0;

16、​​​​​​​​​​​​​​当天拒收总单数

SELECT COUNT(1) "cnt" FROM "tbl_deliver_package" tdp WHERE "state"=4;

17、​​​​​​​​​​​​​​运单总数量

SELECT COUNT(1) "cnt" FROM "tbl_waybill" tw;

18、​​​​​​​​​​​​​​各省份运单数量

SELECT
  t2."pid",
  t2."name",
  SUM(t1."cnt") "count"
FROM
  (SELECT
    ta."id" AS id,
    ta."pid" AS pid,
    ta."name" NAME,
    COUNT(tw."waybillNumber") "cnt"
  FROM
    "tbl_waybill" tw
    LEFT JOIN "tbl_courier" tc
      ON tc."id" = tw."eid"
    LEFT JOIN "tbl_dot" td
      ON td."id" = tc."dotId"
    LEFT JOIN "tbl_areas" ta
      ON ta."id" = CAST(td."manageAreaId",'Int64')
  GROUP BY ta."id",
    ta."pid",
    ta."name") t1
  LEFT JOIN "tbl_areas" t2
    ON t2."id" = t1."pid"
GROUP BY t2."pid", t2."name";

19、​​​​​​​​​​​​​​各地区运单数量

SELECT
  ta."id",
  ta."pid",
  ta."name",
  COUNT(tw."waybillNumber") "cnt"
FROM
  "tbl_waybill" tw
  LEFT JOIN "tbl_courier" tc
    ON tc."id" = tw."eid"
  LEFT JOIN "tbl_dot" td
    ON td."id" = tc."dotId"
  LEFT JOIN "tbl_areas" ta
    ON ta."id" = CAST(td."manageAreaId",'Int64')
GROUP BY ta."id", ta."pid", ta."name";

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于客快物流大数据项目(一百零二):业务和指数开发的主要内容,如果未能解决你的问题,请参考以下文章

客快物流大数据项目(一百零七):物流信息查询服务接口开发解决方案

客快物流大数据项目(一百零六):实时ETL处理

客快物流大数据项目(一百零六):实时ETL处理

客快物流大数据项目(一百零一):实时OLAP开发

客快物流大数据项目(一百零五):启动ElasticSearch

客快物流大数据项目(一百零五):启动ElasticSearch