使用查找表中的 withColumn 动态添加新列
Posted
技术标签:
【中文标题】使用查找表中的 withColumn 动态添加新列【英文标题】:adding a new column using withColumn from a lookup table dynamically 【发布时间】:2020-08-31 09:20:22 【问题描述】:我在 Java 8 中使用 spark-sql-2.4.1v。我有一个场景,我需要从查找表中动态添加一列。
我有带有列的数据框 A, B, C, ..., X,Y, Z
当少数(原始)列(例如:A、B、C)值为 null 时,我需要取/替换列(例如:X、Y、Z)值,否则取原始列值。 我将获取此映射信息作为业务逻辑的一部分。 如果是这种情况,我将遵循下面的硬编码代码
Dataset<Row> substitutedDs = ds
.withColumn("A",
when(col("A").isNull() , col("X").cast(DataTypes.StringType))
.otherwise(col("A").cast(DataTypes.StringType))
)
.withColumn("C",
when(col("C").isNull() , col("Z").cast(DataTypes.StringType))
.otherwise(col("C").cast(DataTypes.StringType))
哪个工作正常。但我需要动态/可配置地执行此操作以避免硬编码。
我将得到包含“code”和“code_substitutes”列信息的查找表,如下所示
-------------------------
| Code | Code_Substitute |
-------------------------
A X
B Y
C Z
-------------------------
我需要在上面动态构造“submittedDs”,怎么做?
【问题讨论】:
【参考方案1】:使用 Java8,您可以使用此 Stream.reduce() 重载:
final Dataset<Row> dataframe = ...;
final Map<String, String> substitutes = ...;
final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
.reduce(dataframe, (df, entry) ->
df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
(left, right) -> throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!");
);
组合器(最后一个参数)应该合并两个并行处理的数据帧(如果流是 parallel()
流),但我们根本不允许这样做,因为我们只是在 @ 上调用此逻辑987654324@流。
一个更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:
// ...
Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
.reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
// ...
private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution)
final String original = substitution.getKey();
final String replacement = substitution.getValue();
return df.withColumn(original, when(col(original).isNull(), col(replacement))
.otherwise(col(original)));
private <X> X throwingCombiner(X left, X right)
throw new IllegalStateException("Combining not allowed");
【讨论】:
附带说明,您还可以使用一些虚拟/随机组合器,例如(left, right) -> left
,并假设永远不会调用组合器。但是,如果有人将 .stream()
更改为 .parallelStream()
,我会说你希望它失败(理想情况下,在测试中)。
如果您不止一次使用这种方法,您可能希望将 throw
提取到一些 throwingCombiner()
实用方法中...
谢谢,但是在这个 reduce(dataframe, (df, entry) ....what 是什么 df ?我们从哪里得到它?
查看 reduce
方法的 Javadoc:第二个参数是一个 BiFunction,您可以控制它获取迄今为止累积的任何数据帧(从原始数据帧开始,作为 @987654332 传入@ param) 和要“附加”到它的新元素(在这种情况下是一个映射条目)并返回“包含”该元素的新 Dataframe ...因此,reduce 方法本身将为它提供 df
每次调用,从dataframe
开始,然后对每个条目应用replaceIfNull
的结果。有意义吗?
.reduce function using javaapi is not working ...而不是 codeSubstitutes ,我正在尝试使用 List在 Scala 中,我会这样做
val substitueMapping: Map[String, String] = ??? //this is your substitute map, this is small as it contains columns and their null substitutes
val df = ??? //this is your main dataframe
val substitutedDf = substituteMapping.keys().foldLeft(df)((df, k) =>
df.withColumn(k, when(col(k).isNull, col(substituteMapping(k))).otherwise(col(k)))
//do approproate casting in above which you have done in post
)
我认为 foldLeft
在 Java 8 中不存在,您可以通过重复修改变量并在 substituteMapping
上进行迭代来模拟相同的情况。
【讨论】:
不幸的是,折叠功能在 Java 中不可用。当我尝试使用 stream() 时它不起作用 @Dee 感谢 Dee,但如果第一列为空,我需要将一列的值替换为另一列的值 ... “重复修改变量” lambda 内部的任何修改都不能在 lambda 函数外部访问。 @BdEngineer 你应该修改var df
,你可以从 lambda 访问它以上是关于使用查找表中的 withColumn 动态添加新列的主要内容,如果未能解决你的问题,请参考以下文章
spark中用于添加新列的withcolumn()未显示结果[重复]
UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()
在 Pyspark 中的 .withColumn 中编写自定义条件