Pyspark 爆炸功能未按预期工作
Posted
技术标签:
【中文标题】Pyspark 爆炸功能未按预期工作【英文标题】:Pyspark explode function not working as expected 【发布时间】:2021-12-29 10:32:11 【问题描述】:我正在将一个配置单元表读入 Pyspark 数据帧。 hive 表有一个定义为字符串的有效负载。样例记录,
"journal":
"batchName": "Test",
"lines": [
"lineNumber": 1,
"lineDescription": "Something",
"companyCode": "100",
"debit": 0,
"credit": 27,
"accountedDebit": 0,
"accountedCredit": 27
,
"lineNumber": 2,
"lineDescription": "Somethingg",
"companyCode": "100",
"debit": 27,
"credit": 0,
"accountedDebit": 27,
"accountedCredit": 0
]
这是存储为字符串。我能够使用 get_json_object() 提取 batchName 但无法将行扩展为 2 行。我试图将线条转换为数组,然后使用explode,但它仍然不起作用。示例代码如下。
main_data3 = main_data2.withColumn("lines_line_number2", F.regexp_replace(F.regexp_replace(F.col("lines_line_number"), '\[', ''),'\]',''))
main_data4 = main_data3.withColumn("lines_line_number3", F.array("lines_line_number2"))
main_data5 = main_data4.withColumn("lines_line_number4", F.explode("lines_line_number3")
... )
我想使用 spark sql 从这个数据集中扩展和提取所有列。我无法定义自定义架构。
【问题讨论】:
【参考方案1】:您可以使用outer_explode
,然后使用简单的“*”将结构中的所有字段作为单独的列。
我根据您提供的字符串创建了一个数据框df
。
from pyspark.sql.functions import explode_outer
df_json = spark.read.json(df.rdd.map(lambda r: r.json))
# Explode the lines array, which will result in struct.
exploded_df =df_json.select(df_json.journal.batchName.alias('batchname'),explode_outer(df_json.journal.lines).alias('exploded_lines'))
# Select all fields from the exploded struct.
display(new_df_2.select('batchname', 'exploded_lines.*'))
【讨论】:
这不是 Json 文件。它是 hive 表中的字符串字段。 @anidev711 是的,如果您在屏幕截图中看到初始数据帧 (df),它会显示“字符串”数据类型。将其加载为字符串后,我正在转换为 json 格式。检查一下。 了解,但正如我所提到的,它是一个配置单元表,所以不能将其读取为 JSON。 @anidev711 如果您已经在 Spark 中配置了配置单元元存储,则可以直接将 Hive 列作为数据帧读取。df = spark.sql('select col from table')
在此之后,您可以继续使用上面的代码。试试看!以上是关于Pyspark 爆炸功能未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章