有没有办法在火花流中展平嵌套的 JSON?
Posted
技术标签:
【中文标题】有没有办法在火花流中展平嵌套的 JSON?【英文标题】:Is there any way to flatten the nested JSON in spark streaming? 【发布时间】:2020-04-27 06:44:51 【问题描述】:我已经编写了一个数据集火花作业(批处理)代码来展平数据,它工作正常,但是当我尝试在火花流作业中使用相同的代码 sn-p 时,它会抛出以下错误 带有流源的查询必须使用 writeStream.start();
那么有什么方法可以在 Streaming 作业中展平嵌套 JSON 吗? 示例输入嵌套 JSON -
"name":" Akash",
"age":26,
"watches":
"name":"Apple",
"models":[
"Apple Watch Series 5",
"Apple Watch Nike"
]
,
"phones":[
"name":" Apple",
"models":[
"iphone X",
"iphone XR",
"iphone XS",
"iphone 11",
"iphone 11 Pro"
]
,
"name":" Samsung",
"models":[
"Galaxy Note10",
"Galaxy Note10+",
"Galaxy S10e",
"Galaxy S10",
"Galaxy S10+"
]
,
"name":" Google",
"models":[
"Pixel 3",
"Pixel 3a"
]
]
预期输出。 output after falttening
下面是代码sn-p。
private static org.apache.spark.sql.Dataset flattenJSONdf(
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> ds)
org.apache.spark.sql.types.StructField[] fields = ds.schema().fields();
java.util.List<String> fieldsNames = new java.util.ArrayList<>();
for (org.apache.spark.sql.types.StructField s : fields)
fieldsNames.add(s.name());
for (int i = 0; i < fields.length; i++)
org.apache.spark.sql.types.StructField field = fields[i];
org.apache.spark.sql.types.DataType fieldType = field.dataType();
String fieldName = field.name();
if (fieldType instanceof org.apache.spark.sql.types.ArrayType)
java.util.List<String> fieldNamesExcludingArray = new java.util.ArrayList<String>();
for (String fieldName_index : fieldsNames)
if (!fieldName.equals(fieldName_index))
fieldNamesExcludingArray.add(fieldName_index);
java.util.List<String> fieldNamesAndExplode = new java.util.ArrayList<>(
fieldNamesExcludingArray);
String s = String.format("explode_outer(%s) as %s", fieldName,
fieldName);
fieldNamesAndExplode.add(s);
String[] exFieldsWithArray = new String[fieldNamesAndExplode
.size()];
org.apache.spark.sql.Dataset exploded_ds = ds
.selectExpr(fieldNamesAndExplode
.toArray(exFieldsWithArray));
// explodedDf.show();
return flattenJSONdf(exploded_ds);
else if (fieldType instanceof org.apache.spark.sql.types.StructType)
String[] childFieldnames_struct = ((org.apache.spark.sql.types.StructType) fieldType)
.fieldNames();
java.util.List<String> childFieldnames = new java.util.ArrayList<>();
for (String childName : childFieldnames_struct)
childFieldnames.add(fieldName + "." + childName);
java.util.List<String> newfieldNames = new java.util.ArrayList<>();
for (String fieldName_index : fieldsNames)
if (!fieldName.equals(fieldName_index))
newfieldNames.add(fieldName_index);
newfieldNames.addAll(childFieldnames);
java.util.List<org.apache.spark.sql.Column> renamedStrutctCols = new java.util.ArrayList<>();
for (String newFieldNames_index : newfieldNames)
renamedStrutctCols.add(new org.apache.spark.sql.Column(
newFieldNames_index.toString())
.as(newFieldNames_index.toString()
.replace(".", "_")));
scala.collection.Seq renamedStructCols_seq = scala.collection.JavaConverters
.collectionAsScalaIterableConverter(renamedStrutctCols)
.asScala().toSeq();
org.apache.spark.sql.Dataset ds_struct = ds
.select(renamedStructCols_seq);
return flattenJSONdf(ds_struct);
return ds;
【问题讨论】:
你能发布示例 json 和预期输出吗 嗨 srinivas,我在上面添加了示例 json 和输出。 【参考方案1】:Note
代码在scala
中,我使用了Spark Structured Streaming
。
您可以使用org.apache.spark.sql.functions.explode
函数来展平数组列。请检查以下代码。
import org.apache.spark.sql.types._
val schema = DataType.fromJson(""""type":"struct","fields":["name":"age","type":"long","nullable":true,"metadata":,"name":"name","type":"string","nullable":true,"metadata":,"name":"phones","type":"type":"array","elementType":"type":"struct","fields":["name":"models","type":"type":"array","elementType":"string","containsNull":true,"nullable":true,"metadata":,"name":"name","type":"string","nullable":true,"metadata":],"containsNull":true,"nullable":true,"metadata":,"name":"watches","type":"type":"struct","fields":["name":"models","type":"type":"array","elementType":"string","containsNull":true,"nullable":true,"metadata":,"name":"name","type":"string","nullable":true,"metadata":],"nullable":true,"metadata":]""").asInstanceOf[StructType]
// schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,LongType,true), StructField(name,StringType,true), StructField(phones,ArrayType(StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true),true), StructField(watches,StructType(StructField(models,ArrayType(StringType,true),true), StructField(name,StringType,true)),true))
val streamDF = spark.readStream.format("json").schema(schema).load("/tmp/jdata")
// streamDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
streamDF
.withColumn("watches_models",explode($"watches.models")).withColumn("watches_name",$"watches.name")
.withColumn("phones_models",explode($"phones.models")).withColumn("phones_models",explode($"phones_models"))
.withColumn("phones_name",explode($"phones.name"))
.drop("watches","phones")
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+--------------------+------------+--------------+-----------+
|age| name| watches_models|watches_name| phones_models|phones_name|
+---+------+--------------------+------------+--------------+-----------+
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone X| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone XR| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone XS| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11| Google|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Apple|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| iphone 11 Pro| Google|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Apple|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Samsung|
| 26| Akash|Apple Watch Series 5| Apple| Galaxy Note10| Google|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+| Apple|
| 26| Akash|Apple Watch Series 5| Apple|Galaxy Note10+| Samsung|
+---+------+--------------------+------------+--------------+-----------+
only showing top 20 rows
【讨论】:
以上是关于有没有办法在火花流中展平嵌套的 JSON?的主要内容,如果未能解决你的问题,请参考以下文章