如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?

Posted

技术标签:

【中文标题】如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?【英文标题】:How to pass Set/HashSet as argument into UDF in spark? 【发布时间】:2019-07-31 22:32:44 【问题描述】:

我正在尝试在过滤器中编写一个函数,该函数将检查值是否在 Set 中,我正在 UDF 中实现它,但似乎它不能将 Set/HashSet 作为参数。

set 取自:

testSet=existTableDF.select("Column1")
        .rdd.map(r=>r(0).asInstanceOf[String]).collect().toSet

udf:

def checkExistPlan(col1:String,testSet:Set[String]):Boolean=
if (testSet.contains(col1))
      false
     else
      true

val existFilter=udf((x:String,testSet:Set[String])=>checkExistPlan(x,testSet))

使用udf时的代码:

testDF.filter(existFilter('Column1,lit(existMemberHashSet)))

执行时出现如下错误: 线程“主”java.lang.RuntimeException 中的异常:不支持的文字类型类 java.util.HashSet [此处的某些值]

【问题讨论】:

【参考方案1】:

首先,您可能希望广播集用于过滤。 如果您不使用广播集,那么您的地图将被复制到所有分区的所有执行者link。

您的代码中的问题是您正在从非原始类型创建文字。 您应该尝试以下方法:

var s : Set[String] = Set("1","3)
val broadcastedSet = spark.sparkContext.broadcast(s)

def checkExistPlan(col1:String):Boolean=
if (broadcastedSet.value.contains(col1))
      true
     else
      false


val existFilter=udf((x:String)=>checkExistPlan(x))
someDF.filter(existFilter($"number")).show()

【讨论】:

谢谢!它可以工作,只是好奇,如果该 udf 函数在主要进程之外,或者是从其他文件导入的,那么在我广播 set 之后,我应该将什么作为参数传递给 udf?应该是Broadcast[Set]?还是Broadcast[Set[String]] @user2654659 这取决于您要在哪里定义它的设计。 main里面应该没问题。将它作为 Broadcast[Set[String]] 传递给你 udf。

以上是关于如何将 Set/HashSet 作为参数传递给 Spark 中的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

将 xml 数据作为存储过程的参数传递给 s-s-rs

将一个表字段作为参数传递给 s-s-rS 报告中的另一个表

将查询作为参数传递给 udf 函数

Scipy 最小化:如何将参数传递给目标和约束

如何将数组作为参数传递给另一个脚本?

如何将回调作为参数传递给另一个函数