Spark (Java API) - 向 JSON 列添加新字段
Posted
技术标签:
【中文标题】Spark (Java API) - 向 JSON 列添加新字段【英文标题】:Spark (Java API) - add new fields to JSON column 【发布时间】:2021-03-02 09:23:17 【问题描述】:我正在使用 Spark 从 Cassandra 读取行,其中一列是 JSON 对象。这是数据集中的架构和示例行:
root
|-- attributes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- count: integer (nullable = true)
| | |-- score: double (nullable = true)
| | |-- id: integer (nullable = true)
|-- id: string (nullable = true)
|-- col1: string (nullable = true)
+--------------------------------------------------------------------------------+--------+---------+
|attributes |id |col1 |
+--------------------------------------------------------------------------------+--------+---------+
|[[usage,2,5.0,12], [price,1,10.0,48], [hair,1,10.0,23737], [curls,1,10.0,30807]]|19400335|val_str_1|
+--------------------------------------------------------------------------------+--------+---------+
使用以下模式从表中加载:
ArrayType schema =
DataTypes.createArrayType(
new StructType(
new StructField[]
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("count", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("score", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("id", DataTypes.IntegerType, true, Metadata.empty())));
是否可以向 attributes
列注入新字段 - 将来自 id
列的值放在根下,并将来自 col1
列的值放在结构内?
【问题讨论】:
【参考方案1】:您可以尝试使用transform
函数来更新数组中的每个结构元素:
df.withColumn("attributes", expr("transform(attributes, x -> struct(x.name as name, x.count as count, x.score as score, x.id as id, id as root_id, col1 as root_col1))"))
对于spark ,可以先炸开数组列,然后更新groupby再收集列表:
df.withColumn("attr_structs", explode(col("attributes")))
.withColumn("attr_structs", struct(col("attr_structs.*"), col("id").alias("root_id"), col("col1").alias("root_col1")))
.groupBy("id", "col1")
.agg(collect_list(col("attr_structs")).alias("attributes"))
【讨论】:
它也应该与 Java 一起使用吗?出现错误:21/03/02 10:09:10 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.catalyst.parser.ParseException:无关输入“>”期望 '(','SELECT', 'FROM',... == SQL == transform(attributes, x -> struct(x.name as name, x.count as count, x.score as score, x.id as id, id as root_id)) @Seffyn 它应该。你的火花版本是什么? spark-sql_2.11-2.2.0 @Seffy Ahtransform
函数仅适用于 Spark 2.4+。尝试编辑后的答案?
现在它抱怨这个:21/03/02 11:00:44 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.AnalysisException:只能星扩展结构数据类型。属性:ArrayBuffer(attributes)
;以上是关于Spark (Java API) - 向 JSON 列添加新字段的主要内容,如果未能解决你的问题,请参考以下文章
在IntelliJ IDEA中配置Spark(Java API)运行环境
通过 RESTful API 查询 SPARK 作业产生的数据
org.elasticsearch.spark.rdd.api.java.javaesspark哪个包