为 Spark UDF 执行提供上下文
Posted
技术标签:
【中文标题】为 Spark UDF 执行提供上下文【英文标题】:Supplying context for Spark UDF execution 【发布时间】:2020-03-06 22:01:09 【问题描述】:我正在使用 Scala 编程语言。我想用 sha2 和 salt 对整个数据框列进行哈希处理。我已经实现了以下 UDF,它应该采用 MessageDigest 和将被散列的输入字符串。
val md = MessageDigest.getInstance("SHA-256")
val random = new SecureRandom();
val salt: Array[Byte] = new Array[Byte](16)
random.nextBytes(salt)
md.update(salt)
dataFrame.withColumn("ColumnName", Sqlfunc(md, col("ColumnName")))
....some other code....
val HashValue: ((MessageDigest, String) => String) = (md: MessageDigest, input: String) =>
val hashedPassword: Array[Byte] = md.digest(input.getBytes(StandardCharsets.UTF_8))
val sb: StringBuilder = new StringBuilder
for (b <- hashedPassword) sb.append(String.format("%02x", Byte.box(b)))
sb.toString();
val Sqlfunc = udf(HashValue)
但是上面的代码无法编译,因为我不知道如何将 messageDigest 传递给这个函数,所以我遇到了以下错误
<<< ERROR!
java.lang.ClassCastException: com...................$$anonfun$9 cannot be cast to scala.Function1
谁能告诉我我做错了什么? 另外,我是密码学的新手,所以请随意提出任何建议。我们必须使用 Sha2 和盐。 你觉得这里的表现怎么样?
谢谢
【问题讨论】:
这能回答你的问题吗? How can I pass extra parameters to UDFs in Spark SQL? 不,它没有。该解决方案没有对数据框的引用。老实说,我什至不知道如何在 udf 中传递参数。 【参考方案1】:MessageDigest
不在您的数据中。这只是 UDF 评估的上下文。这种类型的上下文是通过closures 提供的。
有很多方法可以达到预期的效果。以下是使用函数currying 的有用模式:
object X extends Serializable
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
def foo(context: String)(arg1: Int, arg2: Int): String =
context.slice(arg1, arg2)
def udfFoo(context: String): UserDefinedFunction =
udf(foo(context) _)
试一试:
spark.range(1).toDF
.select(X.udfFoo("Hello, there!")('id, 'id + 5))
.show(false)
生成
+-----------------+
|UDF(id, (id + 5))|
+-----------------+
|Hello |
+-----------------+
【讨论】:
谢谢 Sim 我已将代码更改为 dataFrame.withColumn("ColumnName", udfHash2(md)(col("ColumnName"))) def udfHash2(md: MessageDigest): UserDefinedFunction = udf(hash2 (md) _) def hash2(md: MessageDigest)(input: String): String = val hashedPassword: Array[Byte] = md.digest(input.getBytes(StandardCharsets.UTF_8)) val sb: StringBuilder = new StringBuilder for (b 您的代码看起来不错。你怎么知道函数没有被调用?不要覆盖ColumnName
,而是添加一个不同的列。如果返回的数据缺少该列,则说明您根本没有使用转换。以上是关于为 Spark UDF 执行提供上下文的主要内容,如果未能解决你的问题,请参考以下文章