PySpark Dataframe 将两列转换为基于第三列值的元组新列

Posted

技术标签:

【中文标题】PySpark Dataframe 将两列转换为基于第三列值的元组新列【英文标题】:PySpark Dataframe cast two columns into new column of tuples based value of a third column 【发布时间】:2019-08-05 15:59:24 【问题描述】:

正如主题所描述的,我有一个 PySpark 数据框,我需要将两列转换为 一个新列,它是基于第三列值的元组列表。这个演员将减少或 通过一个键值(在这种情况下为产品 id)和结果 os 一行来展平数据框 每个键。

此数据框中有数亿行,具有 3700 万个唯一产品 ID。因此我需要 一种在 spark 集群上进行转换而不带回任何数据的方法 给驱动程序(在这种情况下是 Jupyter)。

这是我仅针对 1 个产品的数据框的摘录:

+-----------+-------------------+-------------+--------+----------+---------------+
| product_id|      purchase_date|days_warranty|store_id|year_month|       category|
+-----------+-------------------+-----------+----------+----------+---------------+
|02147465400|2017-05-16 00:00:00|           30|     205|   2017-05|     CATEGORY A|
|02147465400|2017-04-15 00:00:00|           30|     205|   2017-04|     CATEGORY A|
|02147465400|2018-07-11 00:00:00|           30|     205|   2018-07|     CATEGORY A|
|02147465400|2017-06-14 00:00:00|           30|     205|   2017-06|     CATEGORY A|
|02147465400|2017-03-16 00:00:00|           30|     205|   2017-03|     CATEGORY A|
|02147465400|2017-08-14 00:00:00|           30|     205|   2017-08|     CATEGORY A|
|02147465400|2017-09-12 00:00:00|           30|     205|   2017-09|     CATEGORY A|
|02147465400|2017-01-21 00:00:00|           30|     205|   2017-01|     CATEGORY A|
|02147465400|2018-08-14 00:00:00|           30|     205|   2018-08|     CATEGORY A|
|02147465400|2018-08-23 00:00:00|           30|     205|   2018-08|     CATEGORY A|
|02147465400|2017-10-11 00:00:00|           30|     205|   2017-10|     CATEGORY A|
|02147465400|2017-12-12 00:00:00|           30|     205|   2017-12|     CATEGORY A|
|02147465400|2017-02-15 00:00:00|           30|     205|   2017-02|     CATEGORY A|
|02147465400|2018-04-12 00:00:00|           30|     205|   2018-04|     CATEGORY A|
|02147465400|2018-03-12 00:00:00|           30|     205|   2018-03|     CATEGORY A|
|02147465400|2018-05-15 00:00:00|           30|     205|   2018-05|     CATEGORY A|
|02147465400|2018-02-12 00:00:00|           30|     205|   2018-02|     CATEGORY A|
|02147465400|2018-06-14 00:00:00|           30|     205|   2018-06|     CATEGORY A|
|02147465400|2018-01-11 00:00:00|           30|     205|   2018-01|     CATEGORY A|
|02147465400|2017-07-20 00:00:00|           30|     205|   2017-07|     CATEGORY A|
|02147465400|2017-11-11 00:00:00|           30|     205|   2017-11|     CATEGORY A|
|02147465400|2017-01-05 00:00:00|           90|     205|   2017-01|     CATEGORY B|
|02147465400|2017-01-21 00:00:00|           90|     205|   2017-01|     CATEGORY B|
|02147465400|2017-10-09 00:00:00|           90|     205|   2017-10|     CATEGORY B|
|02147465400|2018-07-11 00:00:00|           90|     205|   2018-07|     CATEGORY B|
|02147465400|2017-04-16 00:00:00|           90|     205|   2017-04|     CATEGORY B|
|02147465400|2018-09-16 00:00:00|           90|     205|   2018-09|     CATEGORY B|
|02147465400|2018-04-14 00:00:00|           90|     205|   2018-04|     CATEGORY B|
|02147465400|2018-01-12 00:00:00|           90|     205|   2018-01|     CATEGORY B|
|02147465400|2017-07-15 00:00:00|           90|     205|   2017-07|     CATEGORY B|
+-----------+-------------------+-----------+----------+----------+---------------+

这是所需的结果数据框,一个产品的一行,其中行 原始数据框的 purchase_date 和 days_warranty 列 作为基于类别列值的元组数组到新列中:

+-----------+----------------------------+----------------------------+
| product_id|                  CATEGORY A|                  CATEGORY B| 
+-----------+----------------------------+----------------------------+
|02147465400| [ (2017-05-16 00:00:00,30),| [ (2017-01-05 00:00:00,90),| 
|           |   (2017-04-15 00:00:00,30),|   (2017-01-21 00:00:00,90),|
|           |   (2018-07-11 00:00:00,30),|   (2017-10-09 00:00:00,90),|
|           |   (2017-06-14 00:00:00,30),|   (2018-07-11 00:00:00,90),|
|           |   (2017-03-16 00:00:00,30),|   (2017-04-16 00:00:00,90),|
|           |   (2017-08-14 00:00:00,30),|   (2018-09-16 00:00:00,90),|
|           |   (2017-09-12 00:00:00,30),|   (2018-04-14 00:00:00,90),|
|           |   (2017-01-21 00:00:00,30),|   (2018-01-12 00:00:00,90),|
|           |   (2018-08-14 00:00:00,30),|   (2017-07-15 00:00:00,90) |
|           |   (2018-08-23 00:00:00,30),| ]                          |
|           |   (2017-10-11 00:00:00,30),|                            |
|           |   (2017-12-12 00:00:00,30),|                            |
|           |   (2017-02-15 00:00:00,30),|                            |
|           |   (2018-04-12 00:00:00,30),|                            |
|           |   (2018-03-12 00:00:00,30),|                            |
|           |   (2018-05-15 00:00:00,30),|                            |
|           |   (2018-02-12 00:00:00,30),|                            |
|           |   (2018-06-14 00:00:00,30),|                            |
|           |   (2018-01-11 00:00:00,30),|                            |
|           |   (2017-07-20 00:00:00,30) |                            |
|           | ]                                                       |
+-----------+----------------------------+----------------------------+

【问题讨论】:

【参考方案1】:

如果您在使用 pivot 时遇到性能问题,下面的方法是解决同一问题的另一种方法,尽管它允许您通过使用 for 循环将作业分成每个类别的阶段来进行更多控制。对于每次迭代,这会将 category_x 的新数据附加到 acc_df 中,其中将保存累积的结果。

schema = ArrayType( 
        StructType((  
            StructField("p_date", StringType(), False), 
            StructField("d_warranty", StringType(), False)  
        )) 
    )

    tuple_list_udf = udf(tuple_list, schema)

    buf_size = 5 # if you get OOM error decrease this to persist more often

    categories = df.select("category").distinct().collect()

    acc_df = spark.createDataFrame(sc.emptyRDD(), df.schema) # create an empty df which holds the accumulated results for each category

    for idx, c in enumerate(categories):
        col_name = c[0].replace(" ", "_") # spark complains for columns containing space
        cat_df = df.where(df["category"] == c[0]) \
                .groupBy("product_id") \
                .agg(
                    F.collect_list(F.col("purchase_date")).alias("p_date"), 
                    F.collect_list(F.col("days_warranty")).alias("d_warranty")) \
                .withColumn(col_name, tuple_list_udf(F.col("p_date"), F.col("d_warranty"))) \
                .drop("p_date", "d_warranty")

        if idx == 0:
            acc_df = cat_df
        else:
            acc_df = acc_df \
                .join(cat_df.alias("cat_df"), "product_id") \
                .drop(F.col("cat_df.product_id"))

        # you can persist here every buf_size iterations
        if idx + 1 % buf_size == 0:
            acc_df = acc_df.persist()

函数 tuple_list 负责生成包含 purchase_date 和 days_warranty 列的元组的列表。

def tuple_list(pdl, dwl):
    return list(zip(pdl, dwl))

这个输出将是:

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id |CATEGORY_B                                                                                                                                                                                                                                         |CATEGORY_A                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|02147465400|[[2017-04-16 00:00:00, 90], [2018-09-16 00:00:00, 90], [2017-10-09 00:00:00, 90], [2018-01-12 00:00:00, 90], [2018-07-11 00:00:00, 90], [2017-01-21 00:00:00, 90], [2018-04-14 00:00:00, 90], [2017-01-05 00:00:00, 90], [2017-07-15 00:00:00, 90]]|[[2017-06-14 00:00:00, 30], [2018-08-14 00:00:00, 30], [2018-01-11 00:00:00, 30], [2018-04-12 00:00:00, 30], [2017-10-11 00:00:00, 30], [2017-05-16 00:00:00, 30], [2018-05-15 00:00:00, 30], [2017-04-15 00:00:00, 30], [2017-02-15 00:00:00, 30], [2018-02-12 00:00:00, 30], [2017-01-21 00:00:00, 30], [2018-07-11 00:00:00, 30], [2018-06-14 00:00:00, 30], [2017-03-16 00:00:00, 30], [2017-07-20 00:00:00, 30], [2018-08-23 00:00:00, 30], [2017-09-12 00:00:00, 30], [2018-03-12 00:00:00, 30], [2017-12-12 00:00:00, 30], [2017-08-14 00:00:00, 30], [2017-11-11 00:00:00, 30]]|
+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

【讨论】:

相当有效的方法。【参考方案2】:

假设您的Dataframe 被称为df

from pyspark.sql.functions import struct
from pyspark.sql.functions import collect_list

gdf = (df.select("product_id", "category", struct("purchase_date", "warranty_days").alias("pd_wd"))
.groupBy("product_id")
.pivot("category")
.agg(collect_list("pd_wd")))

基本上,您必须使用struct()purchase_datewarranty_days 分组到一个列中。然后,您只需按product_id 分组,以category 为轴,可以聚合为collect_list()

【讨论】:

值得注意的是,pivot 是一项昂贵的操作,因此这可能非常慢或可能导致 OOM 错误。 当然可以。将这种方法与.flatMap().groupBy().agg(collect_list()) 进行比较会很有趣。另外值得注意的是,.pivot() 更加灵活,以防将来不同类别的数量发生变化。 我已验证此解决方案在我的环境中有效。谢谢@Travis Hegner!在我将其标记为完成之前,我喜欢在我的 HUGE 数据集上进行测试,并确定我是否获得了可接受的性能。我听说groupBy() 是计算上的重量级操作。

以上是关于PySpark Dataframe 将两列转换为基于第三列值的元组新列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark将两列值组合到另一列?

如何将两列合并到一个新的 DataFrame 中?

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

将 pandas 转换为 pyspark 表达式

如何比较pyspark中两个不同数据帧中的两列

熊猫将两列与空值结合起来