连接单独处理的火花数据帧的两列时的顺序保证是啥?
Posted
技术标签:
【中文标题】连接单独处理的火花数据帧的两列时的顺序保证是啥?【英文标题】:what is the order guarantee when joining two columns of a spark dataframe which are processed separately?连接单独处理的火花数据帧的两列时的顺序保证是什么? 【发布时间】:2020-02-09 03:43:57 【问题描述】:我有 3 列的数据框
-
日期
jsonString1
jsonString2
我想将 json 中的属性扩展为列。所以我做了这样的事情。
val json1 = spark.read.json(dataframe.select(col("jsonString1")).rdd.map(_.getString(0)))
val json2 = spark.read.json(dataframe.select(col("jsonString2")).rdd.map(_.getString(0)))
val json1Table = json1.selectExpr("id", "status")
val json2Table = json2.selectExpr("name", "address")
现在我想把这些表放在一起。所以我做了以下
val json1TableWithIndex = addColumnIndex(json1Table)
val json2TableWithIndex = addColumnIndex(json2Table)
var finalResult = json1Table
.join(json2Table, Seq("columnindex"))
.drop("columnindex")
def addColumnIndex(df: DataFrame) = spark.createDataFrame(
df.rdd.zipWithIndex.map case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex) ,
StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)
在对几行进行采样后,我观察到行与源数据帧中的完全匹配 在加入单独处理的数据框的两列时,我没有找到任何关于订单保证的信息。这是解决我的问题的正确方法。任何帮助表示赞赏。
【问题讨论】:
【参考方案1】:依赖未记录的行为总是有风险的,因为您的代码可能无法按您的预期工作,因为您对它只有部分了解。
您可以以更有效的方式做同样的事情,而无需使用任何拆分和连接方法。使用from_json
函数创建两个嵌套列,然后将嵌套列展平,最后丢弃中间 JSON 字符串列和嵌套列。
这是整个过程的示例。
import org.apache.spark.sql.types.StringType, StructType, StructField
val df = (Seq(
("09-02-2020","\"id\":\"01\", \"status\":\"Active\"","\"name\":\"Abdullah\", \"address\":\"Jumeirah\""),
("10-02-2020","\"id\":\"02\", \"status\":\"Dormant\"","\"name\":\"Ali\", \"address\":\"Jebel Ali\"")
).toDF("date","jsonString1","jsonString2"))
scala> df.show()
+----------+--------------------+--------------------+
| date| jsonString1| jsonString2|
+----------+--------------------+--------------------+
|09-02-2020|"id":"01", "stat...|"name":"Abdullah...|
|10-02-2020|"id":"02", "stat...|"name":"Ali", "a...|
+----------+--------------------+--------------------+
val schema1 = (StructType(Seq(
StructField("id", StringType, true),
StructField("status", StringType, true)
)))
val schema2 = (StructType(Seq(
StructField("name", StringType, true),
StructField("address", StringType, true)
)))
val dfFlattened = (df.withColumn("jsonData1", from_json(col("jsonString1"), schema1))
.withColumn("jsonData2", from_json(col("jsonString2"), schema2))
.withColumn("id", col("jsonData1.id"))
.withColumn("status", col("jsonData1.status"))
.withColumn("name", col("jsonData2.name"))
.withColumn("address", col("jsonData2.address"))
.drop("jsonString1")
.drop("jsonString2")
.drop("jsonData1")
.drop("jsonData2"))
scala> dfFlattened.show()
+----------+---+-------+--------+---------+
| date| id| status| name| address|
+----------+---+-------+--------+---------+
|09-02-2020| 01| Active|Abdullah| Jumeirah|
|10-02-2020| 02|Dormant| Ali|Jebel Ali|
+----------+---+-------+--------+---------+
【讨论】:
感谢您的回答。采用我的方法的原因是利用 selectExpr,因为我的 json 嵌套很深。使用 selectExpr,我可以只使用 json 路径进行选择而不是创建模式,但看起来您的解决方案是正确的方法。以上是关于连接单独处理的火花数据帧的两列时的顺序保证是啥?的主要内容,如果未能解决你的问题,请参考以下文章
Pandas - 匹配来自两个数据帧的两列并在 df1 中创建新列