复杂和嵌套的 json 数据集如何与 pyspark 一起使用
Posted
技术标签:
【中文标题】复杂和嵌套的 json 数据集如何与 pyspark 一起使用【英文标题】:How complex and nested json dataset works with pyspark 【发布时间】:2020-08-30 03:25:27 【问题描述】:我有一个非常复杂的数据,并在 scala 中的 databricks 中处理。 我想将该 scala 转换为 python,并且应该与 JSON 中给出的数据一起使用
Scala 代码:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val jsonSchema = new StructType()
.add("battery_level", LongType)
.add("c02_level", LongType)
.add("cca3",StringType)
.add("cn", StringType)
.add("device_id", LongType)
.add("device_type", StringType)
.add("signal", LongType)
.add("ip", StringType)
.add("temp", LongType)
.add("timestamp", TimestampType)
// define a case class
case class DeviceData (id: Int, device: String)
// create some sample data
val eventsDS = Seq (
(0, """"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 """),
(1, """"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 """),
(2, """"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 """),
(3, """"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 """)).toDF("id", "device").as[DeviceData]
display(eventsDS)
Click here to see the output
现在我想在 pyspark 中实现上述代码。 我已经做了一些事情,但因为没有 Seq 在 python 中,所以卡在了 Seq 中。 如何在 pyspark 中处理这些样本数据?
Python 代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dataclasses import dataclass
scSpark = SparkSession.builder.appName("complex data types").getOrCreate()
#Creating JSON schema
jsonSchema = StructType([
StructField("battery_level", LongType(), True),
StructField("c02_level", IntegerType(), True),
StructField("c02_level", LongType(), True),
StructField("cca3",StringType(), True),
StructField("cn", StringType(), True),
StructField("device_id", LongType(), True),
StructField("device_type", StringType(), True),
StructField("signal", LongType(), True),
StructField("ip", StringType(), True),
StructField("temp", LongType(), True),
StructField("timestamp", TimestampType(), True),
])
#Create a Dataset from the above schema
@dataclass
class DeviceData(object):
id: int
device: str
现在我不明白接下来要写什么。 我想要像我提供的图像中的输出。
我的主要动机是 https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html 将所有 scala 转换为 pyspark。这将解决我的问题。
【问题讨论】:
我认为您不需要使用 json 模式。json.load()
json 文件,这将为您提供一个包含数据的漂亮 python 字典。示例中的数据实际上并没有那么复杂。见这里:docs.python.org/3/library/json.html
就我而言,我必须明确定义架构。我想使用 python 代码显示上述数据。我不知道如何处理 Seq。它只是一个样本数据。我在类似结构的 json 中有更多嵌套列表和 json。如果这部分解决了,我会处理那部分。谢谢。
有一个名为 jsonschema python-jsonschema.readthedocs.io/en/stable 的包允许您验证 json,但至于生成 json,我只需创建类来处理 json。如果需要以这种方式处理与火花相关的事情,则可以将帖子标记为这样
谢谢。它与 pyspark 更相关。所以编辑了这个问题。
【参考方案1】:
scala Seq
与 python list
最相似:
eventsDS = (sql.createDataFrame(
[(0, """"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 """),
(1, """"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 """),
(2, """"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 """),
(3, """"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 """)],
['id', 'device']))
eventsDS.show()
+---+--------------------+
| id| device|
+---+--------------------+
| 0|"device_id": 0, ...|
| 1|"device_id": 1, ...|
| 2|"device_id": 2, ...|
| 3|"device_id": 3, ...|
+---+--------------------+
在 pyspark SQL 中,case 类不是必需的。
eventsDS.printSchema()
root
|-- id: long (nullable = true)
|-- device: string (nullable = true)
【讨论】:
以上是关于复杂和嵌套的 json 数据集如何与 pyspark 一起使用的主要内容,如果未能解决你的问题,请参考以下文章
在 Azure 数据工厂中使用复制数据活动将 xml 解析为 json 时如何删除转义字符?
如何将具有嵌套对象的复杂 json 文件映射到 java 对象?