Scala - 如何在 Spark 的 map 函数中实现 Try
Posted
技术标签:
【中文标题】Scala - 如何在 Spark 的 map 函数中实现 Try【英文标题】:Scala - how to implement Try inside a map function in Spark 【发布时间】:2019-03-21 06:45:03 【问题描述】:由于map
转换中的函数抛出java.lang.NullPointerException
,我的 Spark 作业的一个阶段失败。
我的想法是借助 Try
类型从 map
内部获取损坏的 Sale 对象。
所以我故意将函数结果分配给saleOption
变量,以便进行模式匹配。
不幸的是,我当前的实现不起作用,我需要有关如何修复它的建议。将不胜感激任何建议。
这里是初始方法:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] =
rawSales
.map(sale => sale.id -> sale) // throws NullPointerException
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
这是我实现我的想法的方式:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] =
rawSales
.map(sale =>
val saleOption: Option[(String, Sale)] = Try(sale.id -> sale).toOption
saleOption match
case Success(successSale) => successSale
case Failure(e) => throw new IllegalArgumentException(s"Corrupted sale: $rawSale;", e)
)
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
UPD:我的目的是为了调试目的实现这个想法并提高我的 Scala 知识。我不会使用Try
和Exceptions
进行流量控制。
【问题讨论】:
1.您确定 rawSales RDD 的创建没有错误吗?也许你可以构建它,这样你就会有 rawSales: RDD[Option[Sale]]... 2. 你为什么抛出异常?你应该把它过滤掉。 @user3725190 实际上我应该提到我的目的是为了调试目的而编写代码。 【参考方案1】:如果您只想忽略 null
Sales
则将其删除并且不要抛出异常。例如用
rawSales
.flatMap(Option(_))
.keyBy(_.id)
.reduceByKey(
(sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2
)
【讨论】:
【参考方案2】:Try
不应用于流量控制。 Exception
s 应仅在特殊情况下使用。最好的解决方案是修复您的NullPointerException
。如果不应该有任何空值,那么在生成 RDD 的代码中就有错误。如果您期望潜在的空值,例如来自格式错误的输入数据,那么您真的应该使用RDD[(String,Option[Sale])]
。
【讨论】:
以上是关于Scala - 如何在 Spark 的 map 函数中实现 Try的主要内容,如果未能解决你的问题,请参考以下文章
spark dataframe 和 scala Map互相转换