将 JSON 多行文件加载到 pyspark 数据框中
Posted
技术标签:
【中文标题】将 JSON 多行文件加载到 pyspark 数据框中【英文标题】:Loading JSON multiline file into pyspark dataframe 【发布时间】:2021-12-20 12:59:32 【问题描述】:我有一个多行 JSON 文件,我正在使用 pyspark(Spark 3.0 及更高版本)读取该文件。最终目标是能够将 JSON 加载到 postgres 数据库中并对数据运行一些查询。
我使用的是两步法。首先清理 RAW JSON 文件(仅包含必填字段)并将其存储为 parquet 或 JSON。其次将清理后的数据加载到 postgres 中。下面是加载 JSON 文件的代码和文件中的记录数。
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Sample').getOrCreate()
df_source = spark.read.option("multiline",True).json('data.json')
print('Row Count', df_source.count())
行数 1
以下是数据框的架构
df_source.printSchema()
root
|-- data: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- meta: struct (nullable = true)
| |-- view: struct (nullable = true)
| | |-- category: string (nullable = true)
| | |-- columns: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- dataTypeName: string (nullable = true)
| | | | |-- description: string (nullable = true)
| | | | |-- fieldName: string (nullable = true)
| | | | |-- id: long (nullable = true)
| | | | |-- position: long (nullable = true)
| | |-- createdAt: long (nullable = true)
| | |-- description: string (nullable = true)
| | |-- downloadCount: long (nullable = true)
所以文件由包含实际数据的数据标签和元标签组成,元标签包含有关实际数据的元数据信息。
谁能建议一种方法将上述数据框中的数据提取到单独的数据框中,以便我可以编写最终数据集?
示例 JSON 数据如下:
"meta" :
"view" :
"category" : "This is the view category",
"createdAt" : 1439381433,
"description" : "The data is a sample subset of the actual data",
"downloadCount" : 33858,
"columns" : [
"id" : -1,
"dataTypeName" : "text",
"fieldName" : "sid",
"position" : 1,
"description" : "meta_data"
,
"id" : -10,
"dataTypeName" : "text",
"fieldName" : "id",
"position" : 2,
"description" : "meta_data"
,
"id" : -20,
"dataTypeName" : "long",
"fieldName" : "created_at",
"position" : 3,
"description" : "meta_data"
,
"id" : -30,
"dataTypeName" : "long",
"fieldName" : "updated_at",
"position" : 4,
"description" : "meta_data"
,
"id" : 217182091,
"dataTypeName" : "text",
"fieldName" : "measureid",
"position" : 5,
"description" : "Unique measure id"
,
"id" : 217182092,
"dataTypeName" : "text",
"fieldName" : "measurename",
"position" : 6,
"description" : "Unique measure name"
,
"id" : 217182093,
"dataTypeName" : "text",
"fieldName" : "measuretype",
"position" : 7,
"description" : "The type of measure"
,
"id" : 217182100,
"dataTypeName" : "text",
"fieldName" : "reportyear",
"position" : 8,
"description" : "year on which reported"
,
"id" : 217182100,
"dataTypeName" : "text",
"fieldName" : "value",
"position" : 9,
"description" : "Value of measure"
]
,
"data" : [ [ "row-8eh8_xxkx-u3mq", "00000000-0000-0000-A1B7-70E47BCE5354", 1439382361, 1439382361, "83", "Number of days", "Counts", "1999", "33" ]
, [ "row-u2v5_78j5-pxk4", "00000000-0000-0000-260A-99DE31733069", 1439382361, 1439382361, "83", "Number of days", "Counts", "2000", "40" ]
, [ "row-68zj_7qfn-sxwu", "00000000-0000-0000-AA6F-0AA88BE0BC18", 1439382361, 1439382361, "83", "Number of days", "Counts", "2002", "39" ]
, [ "row-zziv.xdnh-rsv4", "00000000-0000-0000-D103-71CF4022F146", 1439382361, 1439382361, "85", "Percent of days", "Percent", "1999", "2" ]
, [ "row-8dia~i5sg-v6cj", "00000000-0000-0000-1A71-DE17F79EC965", 1439382361, 1439382361, "86", "Person-days", "Counts", "2006", "5" ]
, [ "row-r7kk_e3dm-z22z", "00000000-0000-0000-B536-48BC9313E20F", 1439382361, 1439382361, "83", "Number of days", "Counts", "2006", "67" ]
, [ "row-mst5-k3ph~ikp3", "00000000-0000-0000-7BD9-A3C1B223ECFE", 1439382361, 1439382361, "86", "Person-days", "Counts""2001", "9" ]
]
【问题讨论】:
【参考方案1】:您可以首先从meta
字段中获取列名(fieldName
)及其位置(position
),然后分解data
列以将每一行作为一个数组。要将数组转换为多列,请使用从 meta
字段中获得的位置和名称:
import pyspark.sql.functions as F
columns = [
F.col("row")[r.position-1].alias(r.fieldName) for r in
df_source.select(F.expr("inline(meta.view.columns)")).select("fieldName", "position").collect()
]
df_clean = df_source.select(F.explode("data").alias("row")).select(*columns)
df_clean.show(truncate=False)
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|sid |id |created_at|updated_at|measureid|measurename |measuretype|reportyear|value|
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
#|row-8eh8_xxkx-u3mq|00000000-0000-0000-A1B7-70E47BCE5354|1439382361|1439382361|83 |Number of days |Counts |1999 |33 |
#|row-u2v5_78j5-pxk4|00000000-0000-0000-260A-99DE31733069|1439382361|1439382361|83 |Number of days |Counts |2000 |40 |
#|row-68zj_7qfn-sxwu|00000000-0000-0000-AA6F-0AA88BE0BC18|1439382361|1439382361|83 |Number of days |Counts |2002 |39 |
#|row-zziv.xdnh-rsv4|00000000-0000-0000-D103-71CF4022F146|1439382361|1439382361|85 |Percent of days|Percent |1999 |2 |
#|row-8dia~i5sg-v6cj|00000000-0000-0000-1A71-DE17F79EC965|1439382361|1439382361|86 |Person-days |Counts |2006 |5 |
#|row-r7kk_e3dm-z22z|00000000-0000-0000-B536-48BC9313E20F|1439382361|1439382361|83 |Number of days |Counts |2006 |67 |
#|row-mst5-k3ph~ikp3|00000000-0000-0000-7BD9-A3C1B223ECFE|1439382361|1439382361|86 |Person-days |Counts |2001 |9 |
#+------------------+------------------------------------+----------+----------+---------+---------------+-----------+----------+-----+
【讨论】:
感谢@blackbishop 提供动态解决方案。只是想检查是否可以使用 Pandas 数据框?【参考方案2】:分解数据框列data
,得到一个数组,可以通过索引访问。
Example:
from pyspark.sql.functions import *
df=spark.read.option("multiLine",True).json("data.json").select(explode("data"))
df.select("col").show(10,False)
#+-----------------------------------------------------------------------------------------------------------------------+
#|col |
#+-----------------------------------------------------------------------------------------------------------------------+
#|[row-8eh8_xxkx-u3mq, 00000000-0000-0000-A1B7-70E47BCE5354, 1439382361, 1439382361, 83, Numberofdays, Counts, 1999, 33] |
#|[row-u2v5_78j5-pxk4, 00000000-0000-0000-260A-99DE31733069, 1439382361, 1439382361, 83, Numberofdays, Counts, 2000, 40] |
#|[row-68zj_7qfn-sxwu, 00000000-0000-0000-AA6F-0AA88BE0BC18, 1439382361, 1439382361, 83, Numberofdays, Counts, 2002, 39] |
#|[row-zziv.xdnh-rsv4, 00000000-0000-0000-D103-71CF4022F146, 1439382361, 1439382361, 85, Percentofdays, Percent, 1999, 2]|
#|[row-8dia~i5sg-v6cj, 00000000-0000-0000-1A71-DE17F79EC965, 1439382361, 1439382361, 86, Person-days, Counts, 2006, 5] |
#|[row-r7kk_e3dm-z22z, 00000000-0000-0000-B536-48BC9313E20F, 1439382361, 1439382361, 83, Numberofdays, Counts, 2006, 67] |
#|[row-mst5-k3ph~ikp3, 00000000-0000-0000-7BD9-A3C1B223ECFE, 1439382361, 1439382361, 86, Person-days, Counts, 2001, 9] |
+-----------------------------------------------------------------------------------------------------------------------+
#accessing data by index
df.select(col("col").getItem(0)).show(10,False)
#+------------------+
#|col[0] |
#+------------------+
#|row-8eh8_xxkx-u3mq|
#|row-u2v5_78j5-pxk4|
#|row-68zj_7qfn-sxwu|
#|row-zziv.xdnh-rsv4|
#|row-8dia~i5sg-v6cj|
#|row-r7kk_e3dm-z22z|
#|row-mst5-k3ph~ikp3|
#+------------------+
【讨论】:
以上是关于将 JSON 多行文件加载到 pyspark 数据框中的主要内容,如果未能解决你的问题,请参考以下文章