将带有 json 字符串的数据框列转换为不同的列

Posted

技术标签:

【中文标题】将带有 json 字符串的数据框列转换为不同的列【英文标题】:Convert a dataframe column with json string into different columns 【发布时间】:2019-10-21 13:05:22 【问题描述】:

我正在将数据从 blob 位置获取到数据框中,如下所示。

| NUM_ID|                                                                                                                              Event|
+-------+-----------------------------------------------------------------------------------------------------------------------------------+
|XXXXX01|["SN":"SIG1","E":1571599398000,"V":19.79,"SN":"SIG1","E":1571599406000,"V":19.80,"SN":"SIG2","E":1571599406000,"V":25.30,...|
|XXXXX02|["SN":"SIG1","E":1571599414000,"V":19.79,"SN":"SIG2","E":1571599414000,"V":19.80,"SN":"SIG2","E":1571599424000,"V":25.30,...|

如果我们取一行,它将如下所示。

|XXXXX01|["SN":"SIG1","E":1571599398000,"V":19.79,"SN":"SIG1","E":1571599406000,"V":19.80,"SN":"SIG1","E":1571599414000,"V":19.20,"SN":"SIG2","E":1571599424000,"V":25.30,"SN":"SIG2","E":1571599432000,"V":19.10,"SN":"SIG3","E":1571599440000,"V":19.10,"SN":"SIG3","E":1571599448000,"V":19.10,"SN":"SIG3","E":1571599456000,"V":19.10,"SN":"SIG3","E":1571599396000,"V":19.79,"SN":"SIG3","E":1571599404000,"V":19.79]

事件列具有不同的信号作为 E、V 对。

此数据框的架构如下所示。

scala> df.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- Event: string (nullable = true)

我想将一些信号(假设我只需要 SIG1 和 SIG3)与 E、V 对一起作为一个新列,如下所示。

+-------+--------+--------------+------+
| NUM_ID|   Event|             E|     V|
+-------+--------+--------------+------+
|XXXXX01|    SIG1| 1571599398000| 19.79|
|XXXXX01|    SIG1| 1571599406000| 19.80|
|XXXXX01|    SIG1| 1571599414000| 19.20|
|XXXXX01|    SIG3| 1571599440000| 19.10|
|XXXXX01|    SIG3| 1571599448000| 19.10|
|XXXXX01|    SIG3| 1571599406000| 19.10|
|XXXXX01|    SIG3| 1571599396000| 19.70|
|XXXXX01|    SIG3| 1571599404000| 19.70|
+-------+--------+--------------+------+

每个 NUM_ID 的最终输出应如下所示。

+-------+--------------+------+------+
| NUM_ID|             E|SIG1 V|SIG3 V|    
+-------+--------------+------+------+
|XXXXX01| 1571599398000| 19.79|  null|
|XXXXX01| 1571599406000| 19.80| 19.70|
|XXXXX01| 1571599414000| 19.20|  null|
|XXXXX01| 1571599440000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599406000| 19.80| 19.10|
|XXXXX01| 1571599396000|  null| 19.70|
|XXXXX01| 1571599404000|  null| 19.70|
+-------+--------------+------+------+

欣赏任何潜在客户。 提前致谢!

【问题讨论】:

到目前为止你有什么尝试? @Shankar Koirala-我尝试将 Event 列转换为结构类型,如下所示。 val schema = ArrayType(StructType(Seq(StructField("SN", StringType), StructField("E", StringType), StructField("V", StringType)))) df.withColumn("sig_array", from_json($"Event", schema)) 并获得 [SN, E, V ] 作为新列,现在试图炸毁该列! 如果 json 有预定义的架构,也可以使用 from_json;如果您不知道架构,则可以使用 get_json_object here 【参考方案1】:

Above Event 列包含一行中的多条记录,即数据必须在进一步处理之前展平。数据扁平化可以通过对 DataFrame 的 flatmap 转换操作来实现。

该方法是创建一个扁平化 JSON Dataframe,其中包含所有必要的键和值,最后通过 Spark 读取 json API 将 JSON 转换为 DataFrame。

val mapper = new ObjectMapper()
import spark.implicits._

val flatDF = df.flatMap(row => 
  val numId = row.getAs[String]("NUM_ID")
  val event = row.getAs[String]("Event")
  val data = mapper.readValue(event, classOf[Array[java.util.Map[String, String]]])

  data.map(jsonMap => 
    jsonMap.put("NUM_ID", numId)
    mapper.writeValueAsString(jsonMap)
  )
)

val finalDF = spark.read.json(flatDF)

//finalDF Outout
+-------------+-------+----+-----+
|            E| NUM_ID|  SN|    V|
+-------------+-------+----+-----+
|1571599398000|XXXXX01|SIG1|19.79|
|1571599406000|XXXXX01|SIG1| 19.8|
|1571599406000|XXXXX01|SIG2| 25.3|
|1571599414000|XXXXX02|SIG1|19.79|
|1571599414000|XXXXX02|SIG2| 19.8|
|1571599424000|XXXXX02|SIG2| 25.3|
+-------------+-------+----+-----+

【讨论】:

【参考方案2】:

这是通过获取json对象并分解列得到的,如下所示。

val schema = ArrayType(StructType(Seq(StructField("SN", StringType), StructField("E", StringType), StructField("V", StringType))))
val structDF = fromBlobDF.withColumn("sig_array", from_json($"Event", schema))

val signalsDF = structDF.withColumn("sig_array", explode($"sig_array")).withColumn("SIGNAL", $"sig_array.SN").withColumn("E", $"sig_array.E").withColumn("V", $"sig_array.V").select("NUM_ID","E","SIGNAL","V")

【讨论】:

以上是关于将带有 json 字符串的数据框列转换为不同的列的主要内容,如果未能解决你的问题,请参考以下文章

将字典列表的 Python 数据框列转换为具有单个元素的列

使用部分字符串匹配将数据框中的列替换为另一个数据框列

在pyspark中将带有字符串json字符串的列转换为带有字典的列

熊猫数据框列有带逗号的字符串如何将其转换为列表[关闭]

Pandas:如何将数据框列中的“时间戳”值从对象/字符串转换为时间戳?

如何将数据框列转换为字符串并替换 nans(fillna 不起作用)