如何在spark scala数据框中更新嵌套列的xml值
Posted
技术标签:
【中文标题】如何在spark scala数据框中更新嵌套列的xml值【英文标题】:how to update nested column's value of xml in spark scala dataframe 【发布时间】:2019-10-27 17:24:06 【问题描述】:假设我有以下 xml 数据:
<students>
<studentId>110</studentId>
<info>
<rollNo>2</rollNo>
<address>
<permanent>abc</permanent>
<temporary>def</temporary>
</address>
</info>
<subjects>
<subject>
<name>maths</name>
<credit>3</credit>
</subject>
<subject>
<name>science</name>
<credit>2</credit>
</subject>
</subjects>
</students>
它的架构是:
root
|-- info: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- permanent: string (nullable = true)
| | |-- temporary: string (nullable = true)
| |-- rollNo: long (nullable = true)
|-- studentId: long (nullable = true)
|-- subjects: struct (nullable = true)
| |-- subject: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- credit: long (nullable = true)
| | | |-- name: string (nullable = true)
作为根标签是"students"
。
在这里,我想更新某些列的值。
我想使用UDF
更新"studentId"
列的值。我找到了一种方法:
df = df.withColumn("studentId", updateValue(col("studentId")))
然后,我想更新一个嵌套列,即"info.rollNo"
。
应用上述过程给了我另一个新列“<info.rollNo>updated_value</info.rollNo>
”。找了一阵子,找到了一个办法:
val colStruct = df.select(col("info" + ".*")).columns
.filter(_ != "rollNo")
.map(f => col("info" + "." + f))
df = df.withColumn("info",
struct(
(colStruct :+ updateValue(col("info.rollNo")
).as("rollNo")): _*)
)
对于第三个嵌套列,我尝试了上述方法。但我无法弄清楚这个过程。
在这里,问题是,有人可以解释一下更新嵌套列值的算法,其嵌套级别可能是 3、4、5 等等。
例如:我想更新以下字段。
"info.address.permanent"
这是结构
和
"subjects.subject.credit"
是数组"subject"
的元素
PS:如果您知道其他更新某些列的方法,请提及。
【问题讨论】:
【参考方案1】:我得到了答案。 关于使用 n1,n2,...,nn 嵌套和每个嵌套中的 c 列更新嵌套数据的 x 列:
即让我们更新专栏 => "n1.n2.n3...nn.x"
df = df.withColumn("n1",
struct(
1st nest's columns n1.c except the struct which holds column x,
//like col("n1.col1"), col("n2.col2"), ...,
struct(
2nd nest's columns n2.c except the struct which holds column x,
....
....
....
struct(
nth nest's nn.c columns except column x,
udfApplied(col("n1.n2...nn.x")).as("x")
).as("nn")
).as("n2")
))
val udfApplied = udf((value: String) =>
value + " updated" //update the value here
)
“info.address.permanent”示例:
df = df.withColumn("info",
struct(
col("info.rollNo"),
struct(
col("info.address.temporary"),
udfApplied(col("info.address.permanent")).as("permanent")
).as("address")
))
“subjects.subject.credit”示例: (对于数组类型,一切都是一样的,但我们需要为数组中元素的每个索引创建结构)
df = df.withColumn("subjects",
struct(
array(
struct(
col("subjects.subject.name")(0).as("name"),
udfApplied(col("subjects.subject.credit")(0)).as("credit")
).as("subject"),
struct(
col("subjects.subject.name")(1).as("name"),
udfApplied(col("subjects.subject.credit")(1)).as("credit")
).as("subject")
).as("subject")
))
希望对大家有帮助
【讨论】:
带数组的数组怎么办?以上是关于如何在spark scala数据框中更新嵌套列的xml值的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]
如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?