Spark (Java API) - 向 JSON 列添加新字段

Posted

技术标签:

【中文标题】Spark (Java API) - 向 JSON 列添加新字段【英文标题】:Spark (Java API) - add new fields to JSON column 【发布时间】:2021-03-02 09:23:17 【问题描述】:

我正在使用 Spark 从 Cassandra 读取行,其中一列是 JSON 对象。这是数据集中的架构和示例行:

root
 |-- attributes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- score: double (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- col1: string (nullable = true)

+--------------------------------------------------------------------------------+--------+---------+
|attributes                                                                      |id      |col1     |
+--------------------------------------------------------------------------------+--------+---------+
|[[usage,2,5.0,12], [price,1,10.0,48], [hair,1,10.0,23737], [curls,1,10.0,30807]]|19400335|val_str_1|
+--------------------------------------------------------------------------------+--------+---------+

使用以下模式从表中加载:

ArrayType schema =
                DataTypes.createArrayType(
                        new StructType(
                                new StructField[] 
                                        new StructField("name", DataTypes.StringType, true, Metadata.empty()),
                                        new StructField("count", DataTypes.IntegerType, true, Metadata.empty()),
                                        new StructField("score", DataTypes.DoubleType, true, Metadata.empty()),
                                        new StructField("id", DataTypes.IntegerType, true, Metadata.empty())));

是否可以向 attributes 列注入新字段 - 将来自 id 列的值放在根下,并将来自 col1 列的值放在结构内?

【问题讨论】:

【参考方案1】:

您可以尝试使用transform 函数来更新数组中的每个结构元素:

df.withColumn("attributes", expr("transform(attributes, x -> struct(x.name as name, x.count as count, x.score as score, x.id as id, id as root_id, col1 as root_col1))"))

对于spark ,可以先炸开数组列,然后更新groupby再收集列表:

df.withColumn("attr_structs", explode(col("attributes")))
    .withColumn("attr_structs", struct(col("attr_structs.*"), col("id").alias("root_id"), col("col1").alias("root_col1")))
    .groupBy("id", "col1")
    .agg(collect_list(col("attr_structs")).alias("attributes"))

【讨论】:

它也应该与 Java 一起使用吗?出现错误:21/03/02 10:09:10 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.catalyst.parser.ParseException:无关输入“>”期望 '(','SELECT', 'FROM',... == SQL == transform(attributes, x -> struct(x.name as name, x.count as count, x.score as score, x.id as id, id as root_id)) @Seffyn 它应该。你的火花版本是什么? spark-sql_2.11-2.2.0 @Seffy Ah transform 函数仅适用于 Spark 2.4+。尝试编辑后的答案? 现在它抱怨这个:21/03/02 11:00:44 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.AnalysisException:只能星扩展结构数据类型。属性:ArrayBuffer(attributes);

以上是关于Spark (Java API) - 向 JSON 列添加新字段的主要内容,如果未能解决你的问题,请参考以下文章

在IntelliJ IDEA中配置Spark(Java API)运行环境

通过 RESTful API 查询 SPARK 作业产生的数据

将字段附加到 JSON 数据集 Java-Spark

org.elasticsearch.spark.rdd.api.java.javaesspark哪个包

Spark 将 JSON 字符串转换为 JSON 对象(Java)

Spark:如何从 Spark 数据帧行解析和转换 json 字符串