在 PySpark 中定义 JSON 模式结构的配置文件

Posted

技术标签:

【中文标题】在 PySpark 中定义 JSON 模式结构的配置文件【英文标题】:Config file to define JSON Schema Structure in PySpark 【发布时间】:2016-07-08 23:10:42 【问题描述】:

我创建了一个 PySpark 应用程序,它通过定义的模式读取数据帧中的 JSON 文件。下面的代码示例

schema = StructType([
    StructField("domain", StringType(), True),
     StructField("timestamp", LongType(), True),                            
])
df= sqlContext.read.json(file, schema)

我需要一种方法来找到如何在某种配置或 ini 文件等中定义此架构。并在 PySpark 应用程序的主程序中读取它。

如果将来有任何需要,这将帮助我修改不断变化的 JSON 的架构,而无需更改主要的 PySpark 代码。

【问题讨论】:

【参考方案1】:

StructType 提供了jsonjsonValue 方法,可用于分别获取jsondict 表示,fromJson 可用于将Python 字典转换为StructType

schema = StructType([
    StructField("domain", StringType(), True),
    StructField("timestamp", LongType(), True),                            
])

StructType.fromJson(schema.jsonValue())

除此之外,您唯一需要的是内置 json 模块来解析 dict 的输入,StructType 可以使用该模块。

Scala 版本见How to create a schema from CSV file and persist/save that schema to a file?

【讨论】:

【参考方案2】:

您可以按以下格式创建名为 schema.json 的 JSON 文件


  "fields": [
    
      "metadata": ,
      "name": "first_fields",
      "nullable": true,
      "type": "string"
    ,
    
      "metadata": ,
      "name": "double_field",
      "nullable": true,
      "type": "double"
    
  ],
  "type": "struct"

通过读取这个文件创建一个结构模式

rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)

之后就可以使用struct作为schema来读取JSON文件了

val df=spark.read.json("path", custom_schema)

【讨论】:

以上是关于在 PySpark 中定义 JSON 模式结构的配置文件的主要内容,如果未能解决你的问题,请参考以下文章

从 Pyspark 中的嵌套 Json-String 列中提取模式

使用 pyspark 中 json 文件中的模式读取固定宽度文件

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

如何在pyspark上更改JSON结构?

Pyspark 将嵌套结构字段转换为 Json 字符串

Pyspark Json 结构