更新java spark中结构类型列中的值

Posted

技术标签:

【中文标题】更新java spark中结构类型列中的值【英文标题】:Update value in struct type column in java spark 【发布时间】:2020-05-05 10:48:02 【问题描述】:

我希望能够更新嵌套数据集中的值。为此,我在 Spark 中创建了一个嵌套数据集。它具有以下架构结构:-

root

 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

现在我想更新数据集中field_to_update 的值。我试过了

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")

也试过了,

aFooData.foreach(new ClassWithForEachFunction());

其中ClassWithForEachFunction implements ForEachFunction<Row> 并具有方法public void call(Row aRow) 来更新field_to_update 属性。对 lamda 也进行了同样的尝试,但它抛出了 Task not serializable 异常,因此必须进行很长时间。

到目前为止,它们都没有成果,在第二种情况下,我得到了带有 foreach 和名称为 field_b.field_d.field_to_update 的新列的相同数据集。还有其他相同的建议吗?

【问题讨论】:

【参考方案1】:

请检查以下代码。

从结构中提取字段 更新所需的文件。 重新构建结构。
scala> df.show(false)
+-------+--------------+
|field_a|field_b       |
+-------+--------------+
|parentA|[srinivas, 20]|
|parentB|[ravi, 30]    |
+-------+--------------+


scala> df.printSchema
root
 |-- field_a: string (nullable = true)
 |-- field_b: struct (nullable = true)
 |    |-- field_to_update: string (nullable = true)
 |    |-- field_not_to_update: integer (nullable = true)


scala> df.select("field_a","field_b.field_to_update","field_b.field_not_to_update").withColumn("field_to_update",lit("updated_val")).select(col("field_a"),struct(col("field_to_update"),col("field_not_to_update")).as("field_b")).show(false)
+-------+-----------------+
|field_a|field_b          |
+-------+-----------------+
|parentA|[updated_val, 20]|
|parentB|[updated_val, 30]|
+-------+-----------------+

【讨论】:

谢谢你,但我需要用 Java 做。 如果您可以将其更改为 scala,您可以使用 spark optics 使更改更容易github.com/hablapps/sparkOptics @Yashu,java 和 scala 的语法几乎相同。让我知道它是否不起作用.. :)【参考方案2】:

你必须重建整个架构,你可以用下面的句子在一个实例中完成。

import org.apache.spark.sql.functions.lit, struct

df.select(
  df("field_a"), // keep the fields that don't change
  struct( // the field at first level must be reconstructed
     lit("updated_value") as "field_to_update", // transform or set the new elements
     df("fb.field_not_to_update") as "field_not_to_update" // keep the unchanged sub elements and keep the last name
  ) as "field_b", // and we have to keep the name
  df("field_c")
)

java中的语法是一样的

【讨论】:

【参考方案3】:

一种更“类似于 Java”的方法是将数据帧转换为(类型化的)数据集,然后使用 map 调用来更改数据。从 Java 的角度来看,代码很容易处理。但缺点是给定架构需要三个 Java bean classes。

Dataset<Bean1> ds = df.as(Encoders.bean(Bean1.class));

Dataset<Bean1> updatedDs = ds.map((MapFunction<Bean1, Bean1>) row -> 
    row.getField_b().getField_d().setField_to_update("updated");
    return row;
, Encoders.bean(Bean1.class));

三个 Bean 类

public static class Bean1 implements Serializable 
    private String field_a;
    private Bean2 field_b;
    private String field_c;

    //getters and setters


public static class Bean2 implements Serializable 
    private Bean3 field_d;

    //getter and setter


public static class Bean3 implements Serializable 
    private String field_not_to_update;
    private String field_to_update;

    //getters and setters

【讨论】:

以上是关于更新java spark中结构类型列中的值的主要内容,如果未能解决你的问题,请参考以下文章

我想在列中的值中添加“%”单位

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

根据另一列中的值更新 BigQuery 中的嵌套数组

Spark DataFrame ArrayType 或 MapType 用于检查列中的值

如何删除 Spark 表列中的空格(Pyspark)

Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串