如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

Posted

技术标签:

【中文标题】如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列【英文标题】:How to convert Json array list with multiple possible values into columns in a dataframe using pyspark 【发布时间】:2019-04-13 06:50:06 【问题描述】:

我通过 Databricks (Spark + Python 3.5) 中的 Python SDK 使用 Google Admin Report API。

它以以下格式返回数据(Databricks pyspark 代码):

dbutils.fs.put("/tmp/test.json", '''
    "userEmail": "rod@test.com", 
    "parameters": [
        
            "intValue": "0",
            "name": "clas-s-room:num_courses_created"
        ,
        
            "boolValue": true,
            "name": "accounts:is_disabled"
        ,
        
            "name": "clas-s-room:role",
            "stringValue": "student"
        
    ]
''', True)

有 188 个参数,每个参数可以是 int、bool、日期或字符串。根据字段类型,Api 会以适当的值返回值(例如 intValue 用于 int 字段,boolValue 用于布尔值)。

我正在将这个未触及的 JSON 写入我的数据湖中,并稍后通过将其加载到 spark 数据帧中来处理它:

testJsonData = sqlContext.read.json("/tmp/test.json", multiLine=True)

这会产生具有此架构的数据框:

用户邮箱:字符串 参数:数组 元素:结构 boolValue:boolean intValue:字符串 姓名:字符串 字符串值:字符串

如果我显示它显示为的数据框

"boolValue":null,"intValue":"0","name":"clas-s-room:num_courses_created","stringValue":null "boolValue":true,"intValue":null,"name":"accounts:is_disabled","stringValue":null "boolValue":null,"intValue":null,"name":"clas-s-room:role","stringValue":"student"

如您所见,它为不存在的 typeValues 推断了 null。

我想要的最终状态是数据框中的列,例如:

并且旋转的列将被正确输入(例如,classic:num_courses_created 的类型为 int - 请参见上面的黄色列)

这是我迄今为止尝试过的:

from pyspark.sql.functions import explode
tempDf = testJsonData.select("userEmail", explode("parameters").alias("parameters_exploded"))
explodedColsDf = tempDf.select("userEmail", "parameters_exploded.*")

这会产生具有此架构的数据框:

用户邮箱:字符串 boolValue:boolean intValue:字符串 姓名:字符串 字符串值:字符串

然后我根据名称字段(即“clas-s-room:num_courses_created”、“clas-s-room:role”等(有 188 个名称/值参数对))将行转为列:

#turn intValue into an Int column
explodedColsDf = explodedColsDf.withColumn("intValue", explodedColsDf.intValue.cast(IntegerType()))
pivotedDf = explodedColsDf.groupBy("userEmail").pivot("name").sum("intValue")

这会导致这个数据框:

用户邮箱:字符串 accounts:is_disabled:long clas-s-room:num_courses_created:long 教室:角色:长

这是不正确的,因为列的类型是错误的。

我需要做的是以某种方式查看参数列的所有 typeValues(无法从名称中知道类型或推断它 - 除了在原始 Json 中它只返回相关的 typeValue ) 并且不为 null 的就是该列的类型。每个参数只出现一次,因此只需要为电子邮件键输出字符串、布尔值、int 和日期值,而不是聚合。

这超出了我目前的知识范围,但是我在想一个更简单的解决方案可能是一直回到开头并旋转列我写出 Json 之前,它会在当我将它加载回 Spark 时我想要的格式,但是我根本不愿意转换原始数据。我也不想手动编码 188 个字段的架构,因为我想动态选择我想要的字段,因此它需要能够处理。

【问题讨论】:

【参考方案1】:

下面的代码将提供的示例 JSON 转换为数据帧(不使用 PySpark)。

导入库

import numpy as np
import pandas as pd

分配变量

true = True
false = False

将 JSON 分配给变量

data = [
"userEmail": "rod@test.com", 
"parameters": [
    
        "intValue": "0",
        "name": "clas-s-room:num_courses_created"
    ,
    
        "boolValue": true,
        "name": "accounts:is_disabled"
    ,
    
        "name": "clas-s-room:role",
        "stringValue": "student"
    
    ]
,

"userEmail": "EMAIL2@test.com", 
"parameters": [
    
        "intValue": "1",
        "name": "clas-s-room:num_courses_created"
    ,
    
        "boolValue": false,
        "name": "accounts:is_disabled"
    ,
    
        "name": "clas-s-room:role",
        "stringValue": "student2"
    
    ]

]

将字典转换为列的功能

def get_col(x):
    y = pd.DataFrame(x, index=[0])
    col_name = y.iloc[0]['name']
    y = y.drop(columns=['name'])
    y.columns = [col_name]
    return y

遍历 JSON 列表

df = pd.DataFrame()

for item in range(len(data)):

    # Initialize empty dataframe
    trow = pd.DataFrame()
    temp = pd.DataFrame(data[item])

    for i in range(temp.shape[0]):

        # Read each row
        x = temp.iloc[i]['parameters']
        trow = pd.concat([trow,get_col(x)], axis=1)
        trow['userEmail'] = temp.iloc[i]['userEmail']


    df = df.append(trow) 

# Rearrange columns, drop those that are not needed
df = df[['userEmail', 'clas-s-room:num_courses_created', 'accounts:is_disabled', 'clas-s-room:role']]

输出:

....................... 以前的编辑 .................... .

将 JSON/嵌套字典转换为数据框

temp = pd.DataFrame(data)

# Initialize empty dataframe
df = pd.DataFrame()
for i in range(temp.shape[0]):
    # Read each row
    x = temp.iloc[i]['parameters']
    temp1 = pd.DataFrame([x], columns=x.keys())
    temp1['userEmail'] = temp.iloc[i]['userEmail']

    # Convert nested key:value pairs
    y = x['name'].split(sep=':')
    temp1['name_' + y[0]] = y[1]

    # Combine to dataframe
    df = df.append(temp1, sort=False)

# Rearrange columns, drop those that are not needed
df = df[['userEmail', 'intValue', 'stringValue', 'boolValue', 'name_clas-s-room', 'name_accounts']]

输出

Edit-1 根据更新问题中的屏幕截图,下面的代码应该可以工作。

分配变量

【讨论】:

嗨 Nilesh,感谢您的代码 - 但是输出不太正确 - 我已经用所需输出的屏幕截图更新了我的问题,因此更清晰。 嗨 Rodney,感谢您提供最终所需输出表的屏幕截图。我已经相应地更新了代码。希望对您有所帮助。

以上是关于如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列的主要内容,如果未能解决你的问题,请参考以下文章

如何解析具有多个值的 json?

如何使用 Swifty 动态创建具有多个键和值的 json 对象

使用 pyspark,如何将文件中单行的多个 JSON 文档读入数据框?

将具有多个值的 JSON 从 S3 复制到 Redshift

如何解析具有多个值的json数据?

如何使用 pySpark 使多个 json 处理更快?