使用 UDF 及其性能的 Spark Scala 数据集验证

Posted

技术标签:

【中文标题】使用 UDF 及其性能的 Spark Scala 数据集验证【英文标题】:Spark Scala Dataset Validations using UDF and its Performance 【发布时间】:2018-12-03 10:13:31 【问题描述】:

我是 Spark Scala 的新手。我已经实现了一个使用 UDF 对多个列进行数据集验证的解决方案,而不是在 for 循环中遍历各个列。但我不知道这是如何更快地工作,我必须解释它是更好的解决方案。

数据验证的列将在运行时接收,因此我们不能在代码中硬编码列名。当列值验证失败时,还需要使用列名更新 cmets 列。

旧代码,

def doValidate(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = 
var ValidDF: Dataset[Row] = data
var i:Int = 0
for (s <- columnArray) 
        var list = validValueArrays(i).split(",")
    ValidDF = ValidDF.withColumn("comments",when(ValidDF.col(s).isin(list: _*),concat(lit(col("comments")),lit(" Error: Invalid Records in: ") ,lit(s))).otherwise(col("comments")))
    i = i + 1  
  

return ValidDF;

新代码,

def validateColumnValues(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = 
 var ValidDF: Dataset[Row] = data
 var checkValues = udf((row: Row, comment: String) => 
  var newComment = comment
  for (s: Int  <- 0 to row.length-1) 
    var value = row.get(s)
    var list = validValueArrays(s).split(",")

     if(!list.contains(value))
      

       newComment = newComment + " Error:Invalid Records in: " + columnArray(s) +";"
      
    
     newComment
  );
ValidDF = ValidDF.withColumn("comments",checkValues(struct(columnArray.head, columnArray.tail: _*),col("comments")))

return ValidDF;
 

columnArray --> 将有列列表

validValueArrays --> 将具有对应于列数组位置的有效值。多个有效值将以 , 分隔。

我想知道哪一种更好或任何其他更好的方法来做到这一点。当我测试新代码时看起来更好。当我阅读 UDF 时,这两个逻辑之间的区别是 Spark 的黑盒。而在这种情况下UDF无论如何都会影响性能?

【问题讨论】:

【参考方案1】:

在运行它之前,我需要更正一些右括号。返回 validDF 时要删除一个“”。我仍然收到运行时分析错误。

最好避免使用 UDF,因为 UDF 意味着反序列化以在经典 Scala 中处理数据,然后对其进行重新序列化。但是,如果您的需求无法使用构建 SQL 函数归档,那么您必须使用 UDF,但您必须确保查看 SparkUI 的性能和执行计划。

【讨论】:

谢谢。我已经更新了正确的代码。我在使用 UDF 时遇到了问题,但有什么方法可以验证运行时提供的数据框列并更新评论部分?除了一件一件地做这件事之外,我还没有找到任何其他方法,这需要花费大量时间。有时 GC 内存错误。这个 UDF 解决了这个问题。但是,如果有其他方法,那将是很有帮助的。

以上是关于使用 UDF 及其性能的 Spark Scala 数据集验证的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错

如何使用反射从scala调用spark UDF?

Scala 和 Spark UDF 函数

关于在 Spark Scala 中创建用户定义函数 (UDF)

使用 scala 在 spark sql 中编写 UDF

使用 Option 作为输入参数定义 Spark scala UDF