复杂和嵌套的 json 数据集如何与 pyspark 一起使用

Posted

技术标签:

【中文标题】复杂和嵌套的 json 数据集如何与 pyspark 一起使用【英文标题】:How complex and nested json dataset works with pyspark 【发布时间】:2020-08-30 03:25:27 【问题描述】:

我有一个非常复杂的数据,并在 scala 中的 databricks 中处理。 我想将该 scala 转换为 python,并且应该与 JSON 中给出的数据一起使用

Scala 代码:

import org.apache.spark.sql.types._                        
import org.apache.spark.sql.functions._                     

val jsonSchema = new StructType()
        .add("battery_level", LongType)
        .add("c02_level", LongType)
        .add("cca3",StringType)
        .add("cn", StringType)
        .add("device_id", LongType)
        .add("device_type", StringType)
        .add("signal", LongType)
        .add("ip", StringType)
        .add("temp", LongType)
        .add("timestamp", TimestampType)

// define a case class

case class DeviceData (id: Int, device: String)

// create some sample data

val eventsDS = Seq (

(0, """"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 """),

 (1, """"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 """),

 (2, """"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 """),

 (3, """"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 """)).toDF("id", "device").as[DeviceData]

display(eventsDS)

Click here to see the output

现在我想在 pyspark 中实现上述代码。 我已经做了一些事情,但因为没有 Seq 在 python 中,所以卡在了 Seq 中。 如何在 pyspark 中处理这些样本数据?

Python 代码:

from pyspark.sql import SparkSession 
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dataclasses import dataclass

scSpark = SparkSession.builder.appName("complex data types").getOrCreate()
#Creating JSON schema
jsonSchema = StructType([
  StructField("battery_level", LongType(), True), 
  StructField("c02_level", IntegerType(), True),
  StructField("c02_level", LongType(), True),
  StructField("cca3",StringType(), True),
  StructField("cn", StringType(), True),
  StructField("device_id", LongType(), True),
  StructField("device_type", StringType(), True),
  StructField("signal", LongType(), True),
  StructField("ip", StringType(), True),
  StructField("temp", LongType(), True),
  StructField("timestamp", TimestampType(), True),
])
#Create a Dataset from the above schema
@dataclass
class DeviceData(object):
  id: int
  device: str

现在我不明白接下来要写什么。 我想要像我提供的图像中的输出。

我的主要动机是 https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html 将所有 scala 转换为 pyspark。这将解决我的问题。

【问题讨论】:

我认为您不需要使用 json 模式。 json.load() json 文件,这将为您提供一个包含数据的漂亮 python 字典。示例中的数据实际上并没有那么复杂。见这里:docs.python.org/3/library/json.html 就我而言,我必须明确定义架构。我想使用 python 代码显示上述数据。我不知道如何处理 Seq。它只是一个样本数据。我在类似结构的 json 中有更多嵌套列表和 json。如果这部分解决了,我会处理那部分。谢谢。 有一个名为 jsonschema python-jsonschema.readthedocs.io/en/stable 的包允许您验证 json,但至于生成 json,我只需创建类来处理 json。如果需要以这种方式处理与火花相关的事情,则可以将帖子标记为这样 谢谢。它与 pyspark 更相关。所以编辑了这个问题。 【参考方案1】:

scala Seq 与 python list 最相似:

eventsDS = (sql.createDataFrame(
            [(0, """"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 """),
             (1, """"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 """),
             (2, """"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 """),
             (3, """"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 """)],
            ['id', 'device']))

eventsDS.show()                                                                                                                                                                                           

+---+--------------------+                                                      
| id|              device|
+---+--------------------+
|  0|"device_id": 0, ...|
|  1|"device_id": 1, ...|
|  2|"device_id": 2, ...|
|  3|"device_id": 3, ...|
+---+--------------------+

在 pyspark SQL 中,case 类不是必需的。

eventsDS.printSchema()                                                                                                                                                                                    

root
 |-- id: long (nullable = true)
 |-- device: string (nullable = true)

【讨论】:

以上是关于复杂和嵌套的 json 数据集如何与 pyspark 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

在 Pig 中解析复杂的嵌套 JSON

如何在 REACT 中访问更复杂的嵌套数据 JSON

在 Azure 数据工厂中使用复制数据活动将 xml 解析为 json 时如何删除转义字符?

如何将具有嵌套对象的复杂 json 文件映射到 java 对象?

Spark使用DataFrame读取复杂JSON中的嵌套数组

嵌套的JsonObject与JSONArray的取值---JSON中嵌套JSONArray