在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?

Posted

技术标签:

【中文标题】在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?【英文标题】:What's the best way to model optional parameter to UDF in Spark?在 Spark 中将可选参数建模为 UDF 的最佳方法是什么? 【发布时间】:2019-12-20 06:41:50 【问题描述】:

看起来在 UDF 中不可能有可选/默认参数。这个jira 建议为这种用例使用两个不同的 UDF。

我的代码是这样的,

dataset.select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), col(rangeKeyName)).as("key"),
               RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value"))

UDF 看起来像这样,

def getKeyUDF(datasetArguments: DatasetArguments) = udf((hashKey: String, rangeKey: String) => 
.....
)

在这种情况下,rangeKeyName 可以为 null,这意味着数据集不存在 rangeKey 列。我的 UDF 注册的函数为 rangeKey 处理 null。

我很难在没有 if 的情况下完成这项工作,否则围绕整个 dataset.select 并有两个 UDF。这是唯一的方法吗?另外,由于我使用的是柯里化,所以我不能为我的 UDF 使用函数 (val),所以我必须坚持使用方法 (def)。

【问题讨论】:

【参考方案1】:

当列不存在时,您可以添加只有空值的列。或者,您可以使用 if-else 检查该列是否存在,并将 rangeKey 替换为不存在的空列。这将允许您在两种情况下都使用相同的UDF

如果不存在则添加一个空列\s:

if (!dataset.columns.contains(rangeKeyName))
  dataset = dataset.withColumn("rangeKeyName", lit(None).cast(StringType()))

使用 if-else:

if (dataset.columns.contains(rangeKeyName)) 
  // Same as before
 else 
  dataset.select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), lit(None).cast(StringType())).as("key"),
                 RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value"))

【讨论】:

谢谢。空 col 效果很好,因为我不想对数据集进行任何更改,因为它会影响我的 to_json,所以我以稍微不同的方式使用它,如下所示。 ` var keyCols = ListBuffer(dataset.col(sparkArguments.getDatasetArguments.getHashKeyName)) if(datasetArguments.getSortKeyName != null)keyCols += dataset.col(sparkArguments.getDatasetArguments.getSortKeyName) else keyCols += lit(null) ` 顺便说一句,作为 Spark 的新手,我认为 Spark 将应用优化并且我不会使用 keyCols += lit(null) 或一般使用 keyCols 产生内存开销? @Nisarg:没问题,很高兴你成功了。关于内存开销,我认为在这种情况下这不是问题。 你也可以使用when(col("hashKeyName").isNull,lit("")).otherwise(col("hashKeyName")来实现IF-Else条件。它可以是 select 语句本身的一部分,您无需使用广泛的 IF-Else。请在下面查看我的答案。【参考方案2】:

您也可以在列本身上使用 when(condition, result1).otherwise(result2)。 lit() 可用于避免 NULL。 Lit 可以是任何数据类型。

    import org.apache.spark.sql.functions._
    dataset.select(getKeyUDF(when(col("hashKeyName").isNull,lit("")).otherwise(col("hashKeyName")),col("rangeKeyName")).as("key"))

【讨论】:

我猜,这更接近我想要的,而没有更广泛的 IF-Else。 是的,我一直使用它,就像 sql 中的 case 语句一样。

以上是关于在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark 中将 Scala UDF 转换为 Java 版本?

Spark UDF 作为函数参数,UDF 不在函数范围内

如何将一些额外的字符串参数传递给每行的 spark udf?

如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件

Spark - 为 udf 提供额外的参数

pySpark 在 UDF 中使用 Spark SQL/DF 的最佳选择?