将 JSON 多行文件加载到 pyspark 数据框中

Posted

技术标签:

【中文标题】将 JSON 多行文件加载到 pyspark 数据框中【英文标题】:Loading JSON multiline file into pyspark dataframe 【发布时间】:2021-12-20 12:59:32 【问题描述】:

我有一个多行 JSON 文件,我正在使用 pyspark(Spark 3.0 及更高版本)读取该文件。最终目标是能够将 JSON 加载到 postgres 数据库中并对数据运行一些查询。

我使用的是两步法。首先清理 RAW JSON 文件(仅包含必填字段)并将其存储为 parquet 或 JSON。其次将清理后的数据加载到 postgres 中。下面是加载 JSON 文件的代码和文件中的记录数。

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Sample').getOrCreate()
df_source = spark.read.option("multiline",True).json('data.json')
print('Row Count', df_source.count())

行数 1

以下是数据框的架构

df_source.printSchema()


 root
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- meta: struct (nullable = true)
 |    |-- view: struct (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- columns: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- dataTypeName: string (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- fieldName: string (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- position: long (nullable = true)
 |    |    |-- createdAt: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- downloadCount: long (nullable = true)

所以文件由包含实际数据的数据标签和元标签组成,元标签包含有关实际数据的元数据信息。

谁能建议一种方法将上述数据框中的数据提取到单独的数据框中,以便我可以编写最终数据集?

示例 JSON 数据如下:


  "meta" : 
    "view" : 
      "category" : "This is the view category",
      "createdAt" : 1439381433,
      "description" : "The data is a sample subset of the actual data",
      "downloadCount" : 33858,
      "columns" : [ 
          "id" : -1,
          "dataTypeName" : "text",
          "fieldName" : "sid",
          "position" : 1,
          "description" : "meta_data"
        , 
          "id" : -10,
          "dataTypeName" : "text",
          "fieldName" : "id",
          "position" : 2,
          "description" : "meta_data"
        , 
          "id" : -20,
          "dataTypeName" : "long",
          "fieldName" : "created_at",
          "position" : 3,
          "description" : "meta_data"
        , 
          "id" : -30,
          "dataTypeName" : "long",
          "fieldName" : "updated_at",
          "position" : 4,
          "description" : "meta_data"
        , 
          "id" : 217182091,
          "dataTypeName" : "text",
          "fieldName" : "measureid",
          "position" : 5,
          "description" : "Unique measure id"
        , 
          "id" : 217182092,
          "dataTypeName" : "text",
          "fieldName" : "measurename",
          "position" : 6,
          "description" : "Unique measure name"
        , 
          "id" : 217182093,
          "dataTypeName" : "text",
          "fieldName" : "measuretype",
          "position" : 7,
          "description" : "The type of measure"
        , 
          "id" : 217182100,
          "dataTypeName" : "text",
          "fieldName" : "reportyear",
          "position" : 8,
          "description" : "year on which reported"
        , 
          "id" : 217182100,
          "dataTypeName" : "text",
          "fieldName" : "value",
          "position" : 9,
          "description" : "Value of measure"
         ]
    
  ,
  "data" : [ [ "row-8eh8_xxkx-u3mq", "00000000-0000-0000-A1B7-70E47BCE5354", 1439382361, 1439382361, "83", "Number of days", "Counts", "1999", "33" ]
, [ "row-u2v5_78j5-pxk4", "00000000-0000-0000-260A-99DE31733069", 1439382361, 1439382361, "83", "Number of days", "Counts", "2000", "40" ]
, [ "row-68zj_7qfn-sxwu", "00000000-0000-0000-AA6F-0AA88BE0BC18", 1439382361, 1439382361, "83", "Number of days", "Counts", "2002", "39" ]
, [ "row-zziv.xdnh-rsv4", "00000000-0000-0000-D103-71CF4022F146", 1439382361, 1439382361, "85", "Percent of days", "Percent", "1999", "2" ]
, [ "row-8dia~i5sg-v6cj", "00000000-0000-0000-1A71-DE17F79EC965", 1439382361, 1439382361, "86", "Person-days", "Counts", "2006", "5" ]
, [ "row-r7kk_e3dm-z22z", "00000000-0000-0000-B536-48BC9313E20F", 1439382361, 1439382361, "83", "Number of days", "Counts", "2006", "67" ]
, [ "row-mst5-k3ph~ikp3", "00000000-0000-0000-7BD9-A3C1B223ECFE", 1439382361, 1439382361, "86", "Person-days", "Counts""2001", "9" ]
 ]

【问题讨论】:

【参考方案1】:

您可以首先从meta 字段中获取列名(fieldName)及其位置(position),然后分解data 列以将每一行作为一个数组。要将数组转换为多列,请使用从 meta 字段中获得的位置和名称:

import pyspark.sql.functions as F

columns = [
    F.col("row")[r.position-1].alias(r.fieldName) for r in
    df_source.select(F.expr("inline(meta.view.columns)")).select("fieldName", "position").collect()
]

df_clean = df_source.select(F.explode("data").alias("row")).select(*columns)

df_clean.show(truncate=False)

#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|sid               |id                                  |created_at|updated_at|measureid|measurename    |measuretype|reportyear|value|
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|row-8eh8_xxkx-u3mq|00000000-0000-0000-A1B7-70E47BCE5354|1439382361|1439382361|83       |Number of days |Counts     |1999      |33   |
#|row-u2v5_78j5-pxk4|00000000-0000-0000-260A-99DE31733069|1439382361|1439382361|83       |Number of days |Counts     |2000      |40   |
#|row-68zj_7qfn-sxwu|00000000-0000-0000-AA6F-0AA88BE0BC18|1439382361|1439382361|83       |Number of days |Counts     |2002      |39   |
#|row-zziv.xdnh-rsv4|00000000-0000-0000-D103-71CF4022F146|1439382361|1439382361|85       |Percent of days|Percent    |1999      |2    |
#|row-8dia~i5sg-v6cj|00000000-0000-0000-1A71-DE17F79EC965|1439382361|1439382361|86       |Person-days    |Counts     |2006      |5    |
#|row-r7kk_e3dm-z22z|00000000-0000-0000-B536-48BC9313E20F|1439382361|1439382361|83       |Number of days |Counts     |2006      |67   |
#|row-mst5-k3ph~ikp3|00000000-0000-0000-7BD9-A3C1B223ECFE|1439382361|1439382361|86       |Person-days    |Counts     |2001      |9    |
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+

【讨论】:

感谢@blackbishop 提供动态解决方案。只是想检查是否可以使用 Pandas 数据框?【参考方案2】:

分解数据框列data,得到一个数组,可以通过索引访问。

Example:

from pyspark.sql.functions import *
df=spark.read.option("multiLine",True).json("data.json").select(explode("data"))
df.select("col").show(10,False)

#+-----------------------------------------------------------------------------------------------------------------------+
#|col                                                                                                                    |
#+-----------------------------------------------------------------------------------------------------------------------+
#|[row-8eh8_xxkx-u3mq, 00000000-0000-0000-A1B7-70E47BCE5354, 1439382361, 1439382361, 83, Numberofdays, Counts, 1999, 33] |
#|[row-u2v5_78j5-pxk4, 00000000-0000-0000-260A-99DE31733069, 1439382361, 1439382361, 83, Numberofdays, Counts, 2000, 40] |
#|[row-68zj_7qfn-sxwu, 00000000-0000-0000-AA6F-0AA88BE0BC18, 1439382361, 1439382361, 83, Numberofdays, Counts, 2002, 39] |
#|[row-zziv.xdnh-rsv4, 00000000-0000-0000-D103-71CF4022F146, 1439382361, 1439382361, 85, Percentofdays, Percent, 1999, 2]|
#|[row-8dia~i5sg-v6cj, 00000000-0000-0000-1A71-DE17F79EC965, 1439382361, 1439382361, 86, Person-days, Counts, 2006, 5]   |
#|[row-r7kk_e3dm-z22z, 00000000-0000-0000-B536-48BC9313E20F, 1439382361, 1439382361, 83, Numberofdays, Counts, 2006, 67] |
#|[row-mst5-k3ph~ikp3, 00000000-0000-0000-7BD9-A3C1B223ECFE, 1439382361, 1439382361, 86, Person-days, Counts, 2001, 9]   |
+-----------------------------------------------------------------------------------------------------------------------+

#accessing data by index
df.select(col("col").getItem(0)).show(10,False)
#+------------------+
#|col[0]            |
#+------------------+
#|row-8eh8_xxkx-u3mq|
#|row-u2v5_78j5-pxk4|
#|row-68zj_7qfn-sxwu|
#|row-zziv.xdnh-rsv4|
#|row-8dia~i5sg-v6cj|
#|row-r7kk_e3dm-z22z|
#|row-mst5-k3ph~ikp3|
#+------------------+

【讨论】:

以上是关于将 JSON 多行文件加载到 pyspark 数据框中的主要内容,如果未能解决你的问题,请参考以下文章

用例将 json 加载为一行而不是多行,特定于数据集

从 pyspark 中的多行文件中读取 JSON 文件

Pyspark 将 json 数组转换为数据帧行

在 Pyspark 中将多行组合为 JSON 对象

无法使用 pyspark 显示 mongo 数据库

PySpark (Python):通过 SparkContext.newAPIHadoopFile 加载多行记录