PySpark 嵌套数据框

Posted

技术标签:

【中文标题】PySpark 嵌套数据框【英文标题】:PySpark Nested Dataframe 【发布时间】:2021-12-29 23:39:37 【问题描述】:

我需要一些 Python Pyspark 帮助。我有一个看起来像这样的源对象: 编辑:两个列表都是 numpy 数据类型的 numpy 数组

obj = [("thing1", ([1,2,3], [0.1,0.2,0.3]),
       ("thing2", ([1,2,3], [0.1,0.2,0.3]),
       ("thing3", ([1,2,3], [0.1,0.2,0.3]),
       ("thing4", ([1,2,3], [0.1,0.2,0.3]),
       ...]

有没有办法创建火花数据框,所以它看起来像这样。其中“column2”中的对象本质上是另一个标记为“label1”和“label2”的两列的数据框:

"column1" | "column2"
---------------------
"thing1"  | ["label1": 1, "label2": 0.1, "label1": 2, "label2": 0.2, "label1": 3, "label2": 0.3]
"thing2"  | ["label1": 1, "label2": 0.1, "label1": 2, "label2": 0.2]
...

我的最终目标是生成一个看起来像这样的 JSON 对象。我需要列和标签:

"column1":"thing1", 
  "column2":["1abel1":1, "label2":0.1,"1abel1":2, "label2":0.2,"1abel1":3, "label2":0.3,
 "column1":"thing2", 
  "column2":["1abel1":1, "label2":0.1,"1abel1":2, "label2":0.2,"1abel1":3, "label2":0.3,
 "column1":"thing3", 
  "column2":["1abel1":1, "label2":0.1,"1abel1":2, "label2":0.2,"1abel1":3, "label2":0.3,
 ...

如果这可以相对快速地处理大约 100 万条记录,那就太好了

【问题讨论】:

您好!欢迎来到 ***!您是否考虑过使用 UDF 来解决这个问题? 嗨!感谢您的欢迎!是的,我创建了几个函数来帮助我,但我想看看通过特定于数据框的函数执行此操作是否会使事情变得更快。我也在使用多处理,但它仍然很慢。 对于创建字典,我不知道 UDF 之外的内置解决方案。但是,如果您还没有使用 'explode' 和 'collect_list',您可以随时使用,并将 udf 限制为仅创建 dict。 【参考方案1】:

您可以尝试以下不基于 udf 的方法:

方法一

您可以尝试使用transform 来实现这一点,因为您总是期望标签 1 和标签 2 的数量相同,否则您可以随时修改以下内容以执行检查,例如

transformed_df = (
    df.withColumn(
        "column3",
        F.transform(
            "column2.label2",
            lambda entry,index: F.struct(
                F.col("column2.label1")[index].alias("label1"),
                F.lit(entry).alias("label2")
            )
        )
    )
)
transformed_df.show(truncate=False)
transformed_df.printSchema()
+-------+----------------------------+------------------------------+
|column1|column2                     |column3                       |
+-------+----------------------------+------------------------------+
|thing1 |[1, 2, 3], [0.1, 0.2, 0.3]|[1, 0.1, 2, 0.2, 3, 0.3]|
|thing2 |[1, 2, 3], [0.1, 0.2, 0.3]|[1, 0.1, 2, 0.2, 3, 0.3]|
|thing3 |[1, 2, 3], [0.1, 0.2, 0.3]|[1, 0.1, 2, 0.2, 3, 0.3]|
|thing4 |[1, 2, 3], [0.1, 0.2, 0.3]|[1, 0.1, 2, 0.2, 3, 0.3]|
+-------+----------------------------+------------------------------+

root
 |-- column1: string (nullable = true)
 |-- column2: struct (nullable = true)
 |    |-- label1: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- label2: array (nullable = true)
 |    |    |-- element: float (containsNull = true)
 |-- column3: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- label1: integer (nullable = true)
 |    |    |-- label2: float (nullable = true)

方法2

您还可以将您的转换映射到 RDD,例如

from pyspark.sql import Row
transformed_df = df.rdd.map(lambda row: Row(
    column1=row['column1'],
    column2=[  
        'label1':entry,
        'label2':row['column2']['label2'][index] 
     for index,entry in enumerate(row['column2']['label1']) ]
)).toDF(schema="column1 string, column2 array<struct<label1: int, label2: float>>")

transformed_df.show(truncate=False)
transformed_df.printSchema()
+-------+------------------------------+
|column1|column2                       |
+-------+------------------------------+
|thing1 |[1, 0.1, 2, 0.2, 3, 0.3]|
|thing2 |[1, 0.1, 2, 0.2, 3, 0.3]|
|thing3 |[1, 0.1, 2, 0.2, 3, 0.3]|
|thing4 |[1, 0.1, 2, 0.2, 3, 0.3]|
+-------+------------------------------+

root
 |-- column1: string (nullable = true)
 |-- column2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- label1: integer (nullable = true)
 |    |    |-- label2: float (nullable = true)

设置

为了调试目的,下面包含了重现设置的代码。

from pyspark.sql import functions as F
from pyspark.sql import types as T

data = [
        ("thing1", ([1,2,3], [0.1,0.2,0.3])),
       ("thing2", ([1,2,3], [0.1,0.2,0.3])),
       ("thing3", ([1,2,3], [0.1,0.2,0.3])),
       ("thing4", ([1,2,3], [0.1,0.2,0.3]))
]

df=spark.createDataFrame(
    data,
    T.StructType([
        T.StructField("column1",T.StringType()),
        T.StructField(
            "column2",
            T.StructType([
                T.StructField("label1",T.ArrayType(T.IntegerType())),
                T.StructField("label2",T.ArrayType(T.FloatType())),
            ])
        )
    ])
)
df.show(truncate=False)

+-------+----------------------------+
|column1|column2                     |
+-------+----------------------------+
|thing1 |[1, 2, 3], [0.1, 0.2, 0.3]|
|thing2 |[1, 2, 3], [0.1, 0.2, 0.3]|
|thing3 |[1, 2, 3], [0.1, 0.2, 0.3]|
|thing4 |[1, 2, 3], [0.1, 0.2, 0.3]|
+-------+----------------------------+

【讨论】:

我试过了,但遇到了一些问题。这不会输出我正在寻找的结果 JSON。并且数据对象的数组是具有 numpy 数据类型的 numpy 数组,不适用于 pyspark。

以上是关于PySpark 嵌套数据框的主要内容,如果未能解决你的问题,请参考以下文章

将嵌套的 Json 转换为 Pyspark 中的数据框

使用 spark-xml 从 pyspark 数据框中选择嵌套列

数据框列中的嵌套列表,提取数据框列中列表的值 Pyspark Spark

PySpark:如何更新嵌套列?

使用 PySpark 删除 Dataframe 的嵌套列

Pyspark - 将rdd转换为数据框时数据设置为null