df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项
Posted
技术标签:
【中文标题】df.groupby(\'id\').resample(\'D\').last() 在 Pandas 中的 Pyspark 等效项【英文标题】:Pyspark equivalent for df.groupby('id').resample('D').last() in pandasdf.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项 【发布时间】:2019-04-12 08:37:02 【问题描述】:我有一张像
这样的大桌子我想将其更改为新表:id、date、last_state。
熊猫很简单:
df['time_create'] = pd.to_datetime(df['time_create'])
df = df.set_index('time_create')
df = df.sort_index()
df = df.groupby('id').resample('D').last().reset_index()
但是pyspark很难实现。
我知道:
pysaprk 中的重采样等价物是 groupby + window :
grouped = df.groupBy('store_product_id', window("time_create", "1 day")).agg(sum("Production").alias('Sum Production'))
这里 groupby store_product_id ,在一天内重新采样并计算总和
分组并查找第一个或最后一个:
参考https://***.com/a/35226857/1637673
w = Window().partitionBy("store_product_id").orderBy(col("time_create").desc())
(df
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("store_product_id", "time_create", "state"))
这个 groupby id 并通过 time_create 获取最后一行的顺序。
但是我需要的是 groupby id,按天重新采样,然后按 time_create 获取最后一行。
我知道如果我使用 pandas udf 可能会解决这个问题,Applying UDFs on GroupedData in PySpark (with functioning python example)
但是有没有办法仅仅通过 pyspark 来做到这一点?
【问题讨论】:
你不能在你的窗口中包含一天吗?类似Window().partitionBy("store_product_id", dayofmonth(col("time_create"))).orderBy(col("time_create").desc())
@gaw 听起来不错。我忘了partitionBy
可以多列。在这种情况下,不应该使用dayofmonth
,需要从time_create
添加一个新的日期列。但是分区是不是太多了?我从 2016 年到 2019 年的数据集有超过 2 亿行。
我认为这仍然是可能的。在这 4 年中,您有大约 1400 天和不同的产品,但我认为 spark 应该能够处理这个问题。只需确保将日期与月份和年份一起使用即可。我在一个大型代理数据集上应用了一个窗口,并通过客户端 IP 和目标 URL 对其进行了分区,它仍然有效。我想我为这个用例有更多的分区:)
【参考方案1】:
只需partitionBy("store_product_id", "date")
就可以了
w = Window().partitionBy("store_product_id", "date").orderBy(col("time_create").desc())
x = (df
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("store_product_id", "time_create", "state"))
【讨论】:
以上是关于df.groupby('id').resample('D').last() 在 Pandas 中的 Pyspark 等效项的主要内容,如果未能解决你的问题,请参考以下文章