更新java spark中结构类型列中的值
Posted
技术标签:
【中文标题】更新java spark中结构类型列中的值【英文标题】:Update value in struct type column in java spark 【发布时间】:2020-05-05 10:48:02 【问题描述】:我希望能够更新嵌套数据集中的值。为此,我在 Spark 中创建了一个嵌套数据集。它具有以下架构结构:-
root
|-- field_a: string (nullable = false)
|-- field_b: struct (nullable = true)
| |-- field_d: struct(nullable = false)
|-- field_not_to_update: string(nullable = true)
| |-- field_to_update: string(nullable = false)
| field_c: string (nullable = false)
现在我想更新数据集中field_to_update
的值。我试过了
aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")
也试过了,
aFooData.foreach(new ClassWithForEachFunction());
其中ClassWithForEachFunction implements ForEachFunction<Row>
并具有方法public void call(Row aRow)
来更新field_to_update 属性。对 lamda 也进行了同样的尝试,但它抛出了 Task not serializable 异常,因此必须进行很长时间。
到目前为止,它们都没有成果,在第二种情况下,我得到了带有 foreach 和名称为 field_b.field_d.field_to_update
的新列的相同数据集。还有其他相同的建议吗?
【问题讨论】:
【参考方案1】:请检查以下代码。
从结构中提取字段 更新所需的文件。 重新构建结构。scala> df.show(false)
+-------+--------------+
|field_a|field_b |
+-------+--------------+
|parentA|[srinivas, 20]|
|parentB|[ravi, 30] |
+-------+--------------+
scala> df.printSchema
root
|-- field_a: string (nullable = true)
|-- field_b: struct (nullable = true)
| |-- field_to_update: string (nullable = true)
| |-- field_not_to_update: integer (nullable = true)
scala> df.select("field_a","field_b.field_to_update","field_b.field_not_to_update").withColumn("field_to_update",lit("updated_val")).select(col("field_a"),struct(col("field_to_update"),col("field_not_to_update")).as("field_b")).show(false)
+-------+-----------------+
|field_a|field_b |
+-------+-----------------+
|parentA|[updated_val, 20]|
|parentB|[updated_val, 30]|
+-------+-----------------+
【讨论】:
谢谢你,但我需要用 Java 做。 如果您可以将其更改为 scala,您可以使用 spark optics 使更改更容易github.com/hablapps/sparkOptics @Yashu,java 和 scala 的语法几乎相同。让我知道它是否不起作用.. :)【参考方案2】:你必须重建整个架构,你可以用下面的句子在一个实例中完成。
import org.apache.spark.sql.functions.lit, struct
df.select(
df("field_a"), // keep the fields that don't change
struct( // the field at first level must be reconstructed
lit("updated_value") as "field_to_update", // transform or set the new elements
df("fb.field_not_to_update") as "field_not_to_update" // keep the unchanged sub elements and keep the last name
) as "field_b", // and we have to keep the name
df("field_c")
)
java中的语法是一样的
【讨论】:
【参考方案3】:一种更“类似于 Java”的方法是将数据帧转换为(类型化的)数据集,然后使用 map 调用来更改数据。从 Java 的角度来看,代码很容易处理。但缺点是给定架构需要三个 Java bean classes。
Dataset<Bean1> ds = df.as(Encoders.bean(Bean1.class));
Dataset<Bean1> updatedDs = ds.map((MapFunction<Bean1, Bean1>) row ->
row.getField_b().getField_d().setField_to_update("updated");
return row;
, Encoders.bean(Bean1.class));
三个 Bean 类
public static class Bean1 implements Serializable
private String field_a;
private Bean2 field_b;
private String field_c;
//getters and setters
public static class Bean2 implements Serializable
private Bean3 field_d;
//getter and setter
public static class Bean3 implements Serializable
private String field_not_to_update;
private String field_to_update;
//getters and setters
【讨论】:
以上是关于更新java spark中结构类型列中的值的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据