如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?
Posted
技术标签:
【中文标题】如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?【英文标题】:How to pass Set/HashSet as argument into UDF in spark? 【发布时间】:2019-07-31 22:32:44 【问题描述】:我正在尝试在过滤器中编写一个函数,该函数将检查值是否在 Set 中,我正在 UDF 中实现它,但似乎它不能将 Set/HashSet 作为参数。
set 取自:
testSet=existTableDF.select("Column1")
.rdd.map(r=>r(0).asInstanceOf[String]).collect().toSet
udf:
def checkExistPlan(col1:String,testSet:Set[String]):Boolean=
if (testSet.contains(col1))
false
else
true
val existFilter=udf((x:String,testSet:Set[String])=>checkExistPlan(x,testSet))
使用udf时的代码:
testDF.filter(existFilter('Column1,lit(existMemberHashSet)))
执行时出现如下错误: 线程“主”java.lang.RuntimeException 中的异常:不支持的文字类型类 java.util.HashSet [此处的某些值]
【问题讨论】:
【参考方案1】:首先,您可能希望广播集用于过滤。 如果您不使用广播集,那么您的地图将被复制到所有分区的所有执行者link。
您的代码中的问题是您正在从非原始类型创建文字。 您应该尝试以下方法:
var s : Set[String] = Set("1","3)
val broadcastedSet = spark.sparkContext.broadcast(s)
def checkExistPlan(col1:String):Boolean=
if (broadcastedSet.value.contains(col1))
true
else
false
val existFilter=udf((x:String)=>checkExistPlan(x))
someDF.filter(existFilter($"number")).show()
【讨论】:
谢谢!它可以工作,只是好奇,如果该 udf 函数在主要进程之外,或者是从其他文件导入的,那么在我广播set
之后,我应该将什么作为参数传递给 udf?应该是Broadcast[Set]
?还是Broadcast[Set[String]]
?
@user2654659 这取决于您要在哪里定义它的设计。 main里面应该没问题。将它作为 Broadcast[Set[String]] 传递给你 udf。以上是关于如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章