df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项

Posted

技术标签:

【中文标题】df.groupby(\'id\').resample(\'D\').last() 在 Pandas 中的 Pyspark 等效项【英文标题】:Pyspark equivalent for df.groupby('id').resample('D').last() in pandasdf.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项 【发布时间】:2019-04-12 08:37:02 【问题描述】:

我有一张像

这样的大桌子

我想将其更改为新表:id、date、last_state。

熊猫很简单:

df['time_create'] = pd.to_datetime(df['time_create'])
df = df.set_index('time_create')
df = df.sort_index()
df = df.groupby('id').resample('D').last().reset_index()

但是pyspark很难实现。

我知道:

    pysaprk 中的重采样等价物是 groupby + window :

    grouped = df.groupBy('store_product_id', window("time_create", "1 day")).agg(sum("Production").alias('Sum Production'))
    

    这里 groupby store_product_id ,在一天内重新采样并计算总和

    分组并查找第一个或最后一个:

    参考https://***.com/a/35226857/1637673

    w = Window().partitionBy("store_product_id").orderBy(col("time_create").desc())
    (df
      .withColumn("rn", row_number().over(w))
      .where(col("rn") == 1)
      .select("store_product_id", "time_create", "state"))
    

    这个 groupby id 并通过 time_create 获取最后一行的顺序。

但是我需要的是 groupby id,按天重新采样,然后按 time_create 获取最后一行。

我知道如果我使用 pandas udf 可能会解决这个问题,Applying UDFs on GroupedData in PySpark (with functioning python example)

但是有没有办法仅仅通过 pyspark 来做到这一点?

【问题讨论】:

你不能在你的窗口中包含一天吗?类似Window().partitionBy("store_product_id", dayofmonth(col("time_create"))).orderBy(col("time_create").desc()) @gaw 听起来不错。我忘了partitionBy 可以多列。在这种情况下,不应该使用dayofmonth,需要从time_create 添加一个新的日期列。但是分区是不是太多了?我从 2016 年到 2019 年的数据集有超过 2 亿行。 我认为这仍然是可能的。在这 4 年中,您有大约 1400 天和不同的产品,但我认为 spark 应该能够处理这个问题。只需确保将日期与月份和年份一起使用即可。我在一个大型代理数据集上应用了一个窗口,并通过客户端 IP 和目标 URL 对其进行了分区,它仍然有效。我想我为这个用例有更多的分区:) 【参考方案1】:

只需partitionBy("store_product_id", "date") 就可以了

w = Window().partitionBy("store_product_id", "date").orderBy(col("time_create").desc())
x = (df
    .withColumn("rn", row_number().over(w))
    .where(col("rn") == 1)
    .select("store_product_id", "time_create", "state"))

【讨论】:

以上是关于df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项的主要内容,如果未能解决你的问题,请参考以下文章

将数据帧重新采样为具有任意期末月份的 n 个月期间

groupby 结果到自定义数据框

python.pandas groupby根据最小值更改某列数据

groupby - python 熊猫数据框

PySpark 中的 Groupby cumcount

通过合并更好地替代 groupby [重复]