Pyspark 爆炸功能未按预期工作

Posted

技术标签:

【中文标题】Pyspark 爆炸功能未按预期工作【英文标题】:Pyspark explode function not working as expected 【发布时间】:2021-12-29 10:32:11 【问题描述】:

我正在将一个配置单元表读入 Pyspark 数据帧。 hive 表有一个定义为字符串的有效负载。样例记录,


  "journal": 
    "batchName": "Test",
    "lines": [
      
        "lineNumber": 1,
        "lineDescription": "Something",
        "companyCode": "100",
        "debit": 0,
        "credit": 27,
        "accountedDebit": 0,
        "accountedCredit": 27
      ,
      
        "lineNumber": 2,
        "lineDescription": "Somethingg",
        "companyCode": "100",
        "debit": 27,
        "credit": 0,
        "accountedDebit": 27,
        "accountedCredit": 0
      
    ]
  

这是存储为字符串。我能够使用 get_json_object() 提取 batchName 但无法将行扩展为 2 行。我试图将线条转换为数组,然后使用explode,但它仍然不起作用。示例代码如下。

main_data3 = main_data2.withColumn("lines_line_number2", F.regexp_replace(F.regexp_replace(F.col("lines_line_number"), '\[', ''),'\]',''))
main_data4 = main_data3.withColumn("lines_line_number3", F.array("lines_line_number2"))
main_data5 = main_data4.withColumn("lines_line_number4", F.explode("lines_line_number3")
... )

我想使用 spark sql 从这个数据集中扩展和提取所有列。我无法定义自定义架构。

【问题讨论】:

【参考方案1】:

您可以使用outer_explode,然后使用简单的“*”将结构中的所有字段作为单独的列。

我根据您提供的字符串创建了一个数据框df

    from pyspark.sql.functions import explode_outer

    df_json = spark.read.json(df.rdd.map(lambda r: r.json))
    # Explode the lines array, which will result in struct.
    exploded_df =df_json.select(df_json.journal.batchName.alias('batchname'),explode_outer(df_json.journal.lines).alias('exploded_lines'))
    
    # Select all fields from the exploded struct.
    display(new_df_2.select('batchname', 'exploded_lines.*'))

【讨论】:

这不是 Json 文件。它是 hive 表中的字符串字段。 @anidev711 是的,如果您在屏幕截图中看到初始数据帧 (df),它会显示“字符串”数据类型。将其加载为字符串后,我正在转换为 json 格式。检查一下。 了解,但正如我所提到的,它是一个配置单元表,所以不能将其读取为 JSON。 @anidev711 如果您已经在 Spark 中配置了配置单元元存储,则可以直接将 Hive 列作为数据帧读取。 df = spark.sql('select col from table') 在此之后,您可以继续使用上面的代码。试试看!

以上是关于Pyspark 爆炸功能未按预期工作的主要内容,如果未能解决你的问题,请参考以下文章

使用广播应用地图转换时,pyspark Udf 未按预期工作?

pyspark中的条件爆炸

分组并爆炸pyspark数组类型列

Pyspark - 基于列表中的值爆炸数据框

如何在 PySpark 中与爆炸相反?

python Pyspark:双爆炸发生器