连接单独处理的火花数据帧的两列时的顺序保证是啥?

Posted

技术标签:

【中文标题】连接单独处理的火花数据帧的两列时的顺序保证是啥?【英文标题】:what is the order guarantee when joining two columns of a spark dataframe which are processed separately?连接单独处理的火花数据帧的两列时的顺序保证是什么? 【发布时间】:2020-02-09 03:43:57 【问题描述】:

我有 3 列的数据框

    日期 jsonString1 jsonString2

我想将 json 中的属性扩展为列。所以我做了这样的事情。

 val json1 = spark.read.json(dataframe.select(col("jsonString1")).rdd.map(_.getString(0)))
 val json2 = spark.read.json(dataframe.select(col("jsonString2")).rdd.map(_.getString(0)))

 val json1Table = json1.selectExpr("id", "status")
 val json2Table = json2.selectExpr("name", "address")

现在我想把这些表放在一起。所以我做了以下


     val json1TableWithIndex = addColumnIndex(json1Table)
     val json2TableWithIndex = addColumnIndex(json2Table)
     var finalResult = json1Table
            .join(json2Table, Seq("columnindex"))
            .drop("columnindex")

    def addColumnIndex(df: DataFrame) = spark.createDataFrame(
        df.rdd.zipWithIndex.map  case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex) ,
        StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
    )

在对几行进行采样后,我观察到行与源数据帧中的完全匹配 在加入单独处理的数据框的两列时,我没有找到任何关于订单保证的信息。这是解决我的问题的正确方法。任何帮助表示赞赏。

【问题讨论】:

【参考方案1】:

依赖未记录的行为总是有风险的,因为您的代码可能无法按您的预期工作,因为您对它只有部分了解。

您可以以更有效的方式做同样的事情,而无需使用任何拆分和连接方法。使用from_json 函数创建两个嵌套列,然后将嵌套列展平,最后丢弃中间 JSON 字符串列和嵌套列。

这是整个过程的示例。

import org.apache.spark.sql.types.StringType, StructType, StructField

val df = (Seq( 
("09-02-2020","\"id\":\"01\", \"status\":\"Active\"","\"name\":\"Abdullah\", \"address\":\"Jumeirah\""), 
("10-02-2020","\"id\":\"02\", \"status\":\"Dormant\"","\"name\":\"Ali\", \"address\":\"Jebel Ali\"") 
).toDF("date","jsonString1","jsonString2"))

scala> df.show()
+----------+--------------------+--------------------+
|      date|         jsonString1|         jsonString2|
+----------+--------------------+--------------------+
|09-02-2020|"id":"01", "stat...|"name":"Abdullah...|
|10-02-2020|"id":"02", "stat...|"name":"Ali", "a...|
+----------+--------------------+--------------------+

val schema1 = (StructType(Seq(
  StructField("id", StringType, true), 
  StructField("status", StringType, true)
)))

val schema2 = (StructType(Seq(
  StructField("name", StringType, true), 
  StructField("address", StringType, true)
)))


val dfFlattened = (df.withColumn("jsonData1", from_json(col("jsonString1"), schema1))
            .withColumn("jsonData2", from_json(col("jsonString2"), schema2))
            .withColumn("id", col("jsonData1.id"))
            .withColumn("status", col("jsonData1.status"))
            .withColumn("name", col("jsonData2.name"))
            .withColumn("address", col("jsonData2.address"))
            .drop("jsonString1")
            .drop("jsonString2")
            .drop("jsonData1")
            .drop("jsonData2"))         

scala> dfFlattened.show()
+----------+---+-------+--------+---------+
|      date| id| status|    name|  address|
+----------+---+-------+--------+---------+
|09-02-2020| 01| Active|Abdullah| Jumeirah|
|10-02-2020| 02|Dormant|     Ali|Jebel Ali|
+----------+---+-------+--------+---------+   

【讨论】:

感谢您的回答。采用我的方法的原因是利用 selectExpr,因为我的 json 嵌套很深。使用 selectExpr,我可以只使用 json 路径进行选择而不是创建模式,但看起来您的解决方案是正确的方法。

以上是关于连接单独处理的火花数据帧的两列时的顺序保证是啥?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Pandas 中连接包含列表(系列)的两列

列出两列时合并两个 Pandas 数据框

Pandas - 匹配来自两个数据帧的两列并在 df1 中创建新列

我们如何组合来自相同 data_type 的数据帧的两列的值并获取每个元素的计数?

根据原始数据帧的两列之间的条件创建新的数据帧[关闭]

在 pyspark 数据框中循环遍历两列时将值添加到新列