为 Spark UDF 执行提供上下文

Posted

技术标签:

【中文标题】为 Spark UDF 执行提供上下文【英文标题】:Supplying context for Spark UDF execution 【发布时间】:2020-03-06 22:01:09 【问题描述】:

我正在使用 Scala 编程语言。我想用 sha2 和 salt 对整个数据框列进行哈希处理。我已经实现了以下 UDF,它应该采用 MessageDigest 和将被散列的输入字符串。

  val md = MessageDigest.getInstance("SHA-256")

  val random = new SecureRandom();
  val salt: Array[Byte] = new Array[Byte](16)
  random.nextBytes(salt)
  md.update(salt)

  dataFrame.withColumn("ColumnName", Sqlfunc(md, col("ColumnName")))

  ....some other code....

  val HashValue: ((MessageDigest, String) => String) = (md: MessageDigest, input: String) =>
  
    val hashedPassword: Array[Byte] = md.digest(input.getBytes(StandardCharsets.UTF_8))
    val sb: StringBuilder = new StringBuilder
    for (b <- hashedPassword) sb.append(String.format("%02x", Byte.box(b)))
    sb.toString();
  

  val Sqlfunc = udf(HashValue)

但是上面的代码无法编译,因为我不知道如何将 messageDigest 传递给这个函数,所以我遇到了以下错误

 <<< ERROR!
java.lang.ClassCastException: com...................$$anonfun$9 cannot be cast to scala.Function1

谁能告诉我我做错了什么? 另外,我是密码学的新手,所以请随意提出任何建议。我们必须使用 Sha2 和盐。 你觉得这里的表现怎么样?

谢谢

【问题讨论】:

这能回答你的问题吗? How can I pass extra parameters to UDFs in Spark SQL? 不,它没有。该解决方案没有对数据框的引用。老实说,我什至不知道如何在 udf 中传递参数。 【参考方案1】:

MessageDigest 不在您的数据中。这只是 UDF 评估的上下文。这种类型的上下文是通过closures 提供的。

有很多方法可以达到预期的效果。以下是使用函数currying 的有用模式:

object X extends Serializable 
  import org.apache.spark.sql.expressions.UserDefinedFunction
  import org.apache.spark.sql.functions.udf

  def foo(context: String)(arg1: Int, arg2: Int): String =
    context.slice(arg1, arg2)

  def udfFoo(context: String): UserDefinedFunction =
    udf(foo(context) _)

试一试:

spark.range(1).toDF
  .select(X.udfFoo("Hello, there!")('id, 'id + 5))
  .show(false)

生成

+-----------------+
|UDF(id, (id + 5))|
+-----------------+
|Hello            |
+-----------------+

【讨论】:

谢谢 Sim 我已将代码更改为 dataFrame.withColumn("ColumnName", udfHash2(md)(col("ColumnName"))) def udfHash2(md: MessageDigest): UserDefinedFunction = udf(hash2 (md) _) def hash2(md: MessageDigest)(input: String): String = val hashedPassword: Array[Byte] = md.digest(input.getBytes(StandardCharsets.UTF_8)) val sb: StringBuilder = new StringBuilder for (b 您的代码看起来不错。你怎么知道函数没有被调用?不要覆盖ColumnName,而是添加一个不同的列。如果返回的数据缺少该列,则说明您根本没有使用转换。

以上是关于为 Spark UDF 执行提供上下文的主要内容,如果未能解决你的问题,请参考以下文章

Spark面试题——Spark的内存管理机制

Spark 内存管理

在结构化流中将数据帧传递给 UDF 时出错

Spark - 为 udf 提供额外的参数

Pig 中的 Udfs 共享上下文

Spark内部执行机制