使用查找表中的 withColumn 动态添加新列

Posted

技术标签:

【中文标题】使用查找表中的 withColumn 动态添加新列【英文标题】:adding a new column using withColumn from a lookup table dynamically 【发布时间】:2020-08-31 09:20:22 【问题描述】:

我在 Java 8 中使用 spark-sql-2.4.1v。我有一个场景,我需要从查找表中动态添加一列。

我有带有列的数据框 A, B, C, ..., X,Y, Z

当少数(原始)列(例如:A、B、C)值为 null 时,我需要取/替换列(例如:X、Y、Z)值,否则取原始列值。 我将获取此映射信息作为业务逻辑的一部分。 如果是这种情况,我将遵循下面的硬编码代码

 Dataset<Row>  substitutedDs = ds
                  .withColumn("A",
                             when(col("A").isNull() , col("X").cast(DataTypes.StringType))
                             .otherwise(col("A").cast(DataTypes.StringType))
                          )
                  .withColumn("C",
                             when(col("C").isNull() , col("Z").cast(DataTypes.StringType))
                             .otherwise(col("C").cast(DataTypes.StringType))
                         

哪个工作正常。但我需要动态/可配置地执行此操作以避免硬编码。

我将得到包含“code”和“code_substitutes”列信息的查找表,如下所示

-------------------------
| Code | Code_Substitute |
-------------------------
  A         X
  B         Y
  C         Z
-------------------------

我需要在上面动态构造“submittedDs”,怎么做?

【问题讨论】:

【参考方案1】:

使用 Java8,您可以使用此 Stream.reduce() 重载:

final Dataset<Row> dataframe = ...;
final Map<String, String> substitutes = ...;

final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
    .reduce(dataframe, (df, entry) ->
            df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
            (left, right) ->  throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!"); 
    );

组合器(最后一个参数)应该合并两个并行处理的数据帧(如果流是 parallel() 流),但我们根本不允许这样做,因为我们只是在 @ 上调用此逻辑987654324@流。


一个更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:

    // ...
    Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
        .reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
    // ...



private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution) 
    final String original = substitution.getKey();
    final String replacement = substitution.getValue();
    return df.withColumn(original, when(col(original).isNull(), col(replacement))
            .otherwise(col(original)));


private <X> X throwingCombiner(X left, X right) 
    throw new IllegalStateException("Combining not allowed");

【讨论】:

附带说明,您还可以使用一些虚拟/随机组合器,例如(left, right) -&gt; left,并假设永远不会调用组合器。但是,如果有人将 .stream() 更改为 .parallelStream(),我会说你希望它失败(理想情况下,在测试中)。 如果您不止一次使用这种方法,您可能希望将 throw 提取到一些 throwingCombiner() 实用方法中... 谢谢,但是在这个 reduce(dataframe, (df, entry) ....what 是什么 df ?我们从哪里得到它? 查看 reduce 方法的 Javadoc:第二个参数是一个 BiFunction,您可以控制它获取迄今为止累积的任何数据帧(从原始数据帧开始,作为 @987654332 传入@ param) 和要“附加”到它的新元素(在这种情况下是一个映射条目)并返回“包含”该元素的新 Dataframe ...因此,reduce 方法本身将为它提供 df每次调用,从dataframe 开始,然后对每个条目应用replaceIfNull 的结果。有意义吗? .reduce function using javaapi is not working ...而不是 codeSubstitutes ,我正在尝试使用 List> ...这减少了抛出编译错误 ...该方法类型 Stream> 中的 reduce(Tuple2, BinaryOperator>) 不适用于参数 (Tuple2, ( df, entry) -> )【参考方案2】:

在 Scala 中,我会这样做

val substitueMapping: Map[String, String] = ??? //this is your substitute map, this is small as it contains columns and their null substitutes

val df = ??? //this is your main dataframe 

val substitutedDf = substituteMapping.keys().foldLeft(df)((df, k) => 
    df.withColumn(k, when(col(k).isNull, col(substituteMapping(k))).otherwise(col(k)))
    //do approproate casting in above which you have done in post
)

我认为 foldLeft 在 Java 8 中不存在,您可以通过重复修改变量并在 substituteMapping 上进行迭代来模拟相同的情况。

【讨论】:

不幸的是,折叠功能在 Java 中不可用。当我尝试使用 stream() 时它不起作用 @Dee 感谢 Dee,但如果第一列为空,我需要将一列的值替换为另一列的值 ... “重复修改变量” lambda 内部的任何修改都不能在 lambda 函数外部访问。 @BdEngineer 你应该修改var df,你可以从 lambda 访问它

以上是关于使用查找表中的 withColumn 动态添加新列的主要内容,如果未能解决你的问题,请参考以下文章

spark中用于添加新列的withcolumn()未显示结果[重复]

UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()

如何创建 Pyspark UDF 以向数据框添加新列

在 Pyspark 中的 .withColumn 中编写自定义条件

Envers:如何在 *_AUD 表中添加新列(不在 REVINFO 表中)

如果不存在,则使用 withColumn 创建一个新列