Scala - 如何在 Spark 的 map 函数中实现 Try

Posted

技术标签:

【中文标题】Scala - 如何在 Spark 的 map 函数中实现 Try【英文标题】:Scala - how to implement Try inside a map function in Spark 【发布时间】:2019-03-21 06:45:03 【问题描述】:

由于map 转换中的函数抛出java.lang.NullPointerException,我的 Spark 作业的一个阶段失败。

我的想法是借助 Try 类型从 map 内部获取损坏的 Sale 对象。 所以我故意将函数结果分配给saleOption 变量,以便进行模式匹配。

不幸的是,我当前的实现不起作用,我需要有关如何修复它的建议。将不胜感激任何建议。

这里是初始方法:

  def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = 
    rawSales
      .map(sale => sale.id -> sale) // throws NullPointerException
      .reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
  

这是我实现我的想法的方式:

def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = 
      rawSales
      .map(sale => 
        val saleOption: Option[(String, Sale)] = Try(sale.id -> sale).toOption
        saleOption match 
          case Success(successSale) => successSale
          case Failure(e) => throw new IllegalArgumentException(s"Corrupted sale: $rawSale;", e)
        

      )
       .reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
      

UPD:我的目的是为了调试目的实现这个想法并提高我的 Scala 知识。我不会使用TryExceptions 进行流量控制。

【问题讨论】:

1.您确定 rawSales RDD 的创建没有错误吗?也许你可以构建它,这样你就会有 rawSales: RDD[Option[Sale]]... 2. 你为什么抛出异常?你应该把它过滤掉。 @user3725190 实际上我应该提到我的目的是为了调试目的而编写代码。 【参考方案1】:

如果您只想忽略 null Sales 则将其删除并且不要抛出异常。例如用

rawSales
 .flatMap(Option(_))
 .keyBy(_.id)
 .reduceByKey(
   (sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2
 )

【讨论】:

【参考方案2】:

Try 不应用于流量控制。 Exceptions 应仅在特殊情况下使用。最好的解决方案是修复您的NullPointerException。如果不应该有任何空值,那么在生成 RDD 的代码中就有错误。如果您期望潜在的空值,例如来自格式错误的输入数据,那么您真的应该使用RDD[(String,Option[Sale])]

【讨论】:

以上是关于Scala - 如何在 Spark 的 map 函数中实现 Try的主要内容,如果未能解决你的问题,请参考以下文章

spark dataframe 和 scala Map互相转换

Scala spark 如何与 List[Option[Map[String, DataFrame]]] 交互

如何将 spark scala map 字段合并到 BQ?

如何使用反射从scala调用spark UDF?

如何在 spark-sql 查询中引用地图列?

在 Spark Scala 中使用 map() 重新排序键值对