如何在火花聚合函数中实现scala类型安全

Posted

技术标签:

【中文标题】如何在火花聚合函数中实现scala类型安全【英文标题】:How to implement scala type safety inside spark aggregation function 【发布时间】:2021-09-13 22:34:45 【问题描述】:

如何对 agg 函数中聚合的值实现类型安全?我的目标是在运行前进行类型检查。 $"event.zeroToSixty",例如在编译时不会被检查,所以我想实现一些可以做的事情。

SomeMetadataExtracted case 类包含event 列内的所有类型仅供参考

val currentDay = jobParameters.date.format(DateUtils.PartitionDateFormat)

val plusCar =
  MPHEventWrapper
    .loadMPHTable[SomeMetadataExtracted](
      plusTable,
      Seq(currentDay))

plusCar
  .groupByKey(row =>
    ( row.date,
      row.locale,
      row.carCode))
  .agg(
    count(when($"event.zeroToSixty" === Within5Seconds, 1 ) ).as[Long], <= need type checking here
    count(when( $"event.carLaunched" =!= Unknown, 1 ) ).as[Long], <= need type checking here
    count(when($"event.successfulLaunch" === true, 1)).as[Long])) <= need type checking here
  .map(
    case (
          (date, locale, carType),
          total_quick_cars,
          total_known_launches,
          total_successful_launches,

        ) =>
      carSpeedAggregate(
        date,
        carType,
        locale,
        total_quick_cars,
        total_known_launches,
        total_successful_launches,

      )
  )

更新代码(感谢 Artem !!!) - 新问题,代码非常占用内存

plusCar
    .groupByKey(row =>  (row.date,
        row.locale,
        row.carCode,
        ))
    .mapGroups 
        case ((date: String, locale:String, carCode: String), events: Iterator[EventWithCommonDimensions[EventCombiner[SomeMetadataExtracted, ANStep]]]) => 
          val newEvent = events.toTraversable

          val zeroToSixty = newEvent.count(e =>  e.event.exists(_. e.zeroToSixty.getOrElse("UNKNOWN") =!= FollowUpUnknown ))
          val carLaunched = newEvent.count(e =>  e.event.exists(_.carLaunched.get === Within5Seconds ))
          val successfulLaunch = newEvent.count(e => e.event.exists(_.successfulLaunch == isTrue) )

          carSpeedAggregate(
            date,
            locale,
            deviceCode,
            taskName.get,
            total_quick_hangups.toLong,
            total_followup_calls.toLong,
            total_phone_calls.toLong,
            total_call_attempts.toLong
          )
        

【问题讨论】:

您检查过数据集吗?数据集在编译时提供类型安全。 【参考方案1】:

agg 是一个用于无类型操作的函数。相反,您可以使用 groupByKey 和 mapGroups 的组合。

# Suppose SomeMetadataExtracted has following fields
case class SomeMetadataExtracted(
    date: Timestamp, 
    locale: String, 
    carCode: String, 
    zeroToSixty: String,
    carLaunched: String,
    successfulLaunch: Boolean
    )

plusCar
  .as[SomeMetadataExtracted] //you have to make following import to do like this: import spark_session.implicits._
  .groupByKey((event: SomeMetadataExtracted) =>
    ( event.date,
      event.locale,
      event.carCode))
  .mapGroups
      case ((date, locale, carCode), events: Iterator[SomeMetadataExtracted]) =>
    carSpeedAggregate(
        date,
        locale,
        carCode,
        events.count(e => e.zeroToSixty == Within5Seconds),
        events.count(e => e.carLaunched != Unknown),
        events.count(e => e.successfulLaunch)
    )

【讨论】:

一件事是datelocalecarCode 来自同一行中的不同案例类。如果我在第一个示例中按事件分组,则为 row.event 从您的示例中不清楚。你能分享一下plusCar的类型吗?如果它是 DataFrame 共享,请使用它的架构。 实际上我得到了它的工作!我将发布更改,但在此之前,似乎只有第一个 count 函数被调用。有什么原因吗? 啊,看起来像是一个 TraversableOnce 函数。我怎样才能使这个 Traversable? 我现在不明白你的问题。您可以分享更新的代码吗?在我的示例中,事件是一个 Iterator[SomeMetadataExtracted],因此您可以使用像 regular scala iterator 这样的事件

以上是关于如何在火花聚合函数中实现scala类型安全的主要内容,如果未能解决你的问题,请参考以下文章

Scala 聚合函数的示例

根据scala中的条件对列进行火花数据框聚合

Scala - 如何在 Spark 的 map 函数中实现 Try

如何在scala中获取分层数组的最终元素并在其上应用聚合函数?

Java实现GroupBy/分组TopN功能

Scala 中 10 个超赞的集合操作函数