在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?
Posted
技术标签:
【中文标题】在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?【英文标题】:What's the best way to model optional parameter to UDF in Spark?在 Spark 中将可选参数建模为 UDF 的最佳方法是什么? 【发布时间】:2019-12-20 06:41:50 【问题描述】:看起来在 UDF 中不可能有可选/默认参数。这个jira 建议为这种用例使用两个不同的 UDF。
我的代码是这样的,
dataset.select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), col(rangeKeyName)).as("key"),
RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value"))
UDF 看起来像这样,
def getKeyUDF(datasetArguments: DatasetArguments) = udf((hashKey: String, rangeKey: String) =>
.....
)
在这种情况下,rangeKeyName
可以为 null,这意味着数据集不存在 rangeKey
列。我的 UDF 注册的函数为 rangeKey
处理 null。
我很难在没有 if 的情况下完成这项工作,否则围绕整个 dataset.select
并有两个 UDF。这是唯一的方法吗?另外,由于我使用的是柯里化,所以我不能为我的 UDF 使用函数 (val
),所以我必须坚持使用方法 (def
)。
【问题讨论】:
【参考方案1】:当列不存在时,您可以添加只有空值的列。或者,您可以使用 if-else 检查该列是否存在,并将 rangeKey
替换为不存在的空列。这将允许您在两种情况下都使用相同的UDF
。
如果不存在则添加一个空列\s:
if (!dataset.columns.contains(rangeKeyName))
dataset = dataset.withColumn("rangeKeyName", lit(None).cast(StringType()))
使用 if-else:
if (dataset.columns.contains(rangeKeyName))
// Same as before
else
dataset.select(RecordProvider.getKeyUDF(sparkArguments.getDatasetArguments)(col(hashKeyName), lit(None).cast(StringType())).as("key"),
RecordProvider.getValueUDF(avroSchema)(to_json(struct(dataset.columns.map(col): _*))).as("value"))
【讨论】:
谢谢。空 col 效果很好,因为我不想对数据集进行任何更改,因为它会影响我的 to_json,所以我以稍微不同的方式使用它,如下所示。 ` var keyCols = ListBuffer(dataset.col(sparkArguments.getDatasetArguments.getHashKeyName)) if(datasetArguments.getSortKeyName != null)keyCols += dataset.col(sparkArguments.getDatasetArguments.getSortKeyName) else keyCols += lit(null) ` 顺便说一句,作为 Spark 的新手,我认为 Spark 将应用优化并且我不会使用keyCols += lit(null)
或一般使用 keyCols 产生内存开销?
@Nisarg:没问题,很高兴你成功了。关于内存开销,我认为在这种情况下这不是问题。
你也可以使用when(col("hashKeyName").isNull,lit("")).otherwise(col("hashKeyName")
来实现IF-Else条件。它可以是 select 语句本身的一部分,您无需使用广泛的 IF-Else。请在下面查看我的答案。【参考方案2】:
您也可以在列本身上使用 when(condition, result1).otherwise(result2)。 lit() 可用于避免 NULL。 Lit 可以是任何数据类型。
import org.apache.spark.sql.functions._
dataset.select(getKeyUDF(when(col("hashKeyName").isNull,lit("")).otherwise(col("hashKeyName")),col("rangeKeyName")).as("key"))
【讨论】:
我猜,这更接近我想要的,而没有更广泛的 IF-Else。 是的,我一直使用它,就像 sql 中的 case 语句一样。以上是关于在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache Spark 中将 Scala UDF 转换为 Java 版本?
如何将一些额外的字符串参数传递给每行的 spark udf?