如何编写 pyspark map-reduce 来计算日期之前的事件数
Posted
技术标签:
【中文标题】如何编写 pyspark map-reduce 来计算日期之前的事件数【英文标题】:How to write pyspark map-reduce to compute number of events prior to date 【发布时间】:2021-06-30 18:50:35 【问题描述】:我有两张表,我们称它们为用户、事件
users: [user_id, activity_date]
[123, 4-28-2020 ]
[456, 4-27-2020 ]
events: [user_id, event_date]
[123, 5-28-2020 ]
[456, 4-27-2020 ]
[456, 4-25-2020 ]
[456, 4-30-2020 ]
[456, 1-30-2020 ]
[123, 1-28-2020 ]
我想要一个汇总表,显示每个用户的事件表中的事件计数,然后再存储在 users.activity_date 中的值。
所以上面的例子会产生:
[user_id, total]
[123, 1 ]
[456, 3 ]
我尝试使用相关查询并在redshift上执行此操作,但它没有完成(第一个表中有数百万条记录,第二个有数千万条记录)......所以我的想法是使用map reduce。 ..但我不知道从哪里开始。我可以阅读 pyspark 中的表格,这就是我卡住的地方。
【问题讨论】:
您能否分享您的 Redshift SQL,因为我预计您会遇到问题。 1000 万行对于 Redshift 来说很小,因此如果您的查询未完成,则可能存在问题。 @user491880 如果答案解决了您的问题,请接受并投票赞成 【参考方案1】:您只需要一个join
,找出event_date
是否大于activity_date
和sum
# create data frames
events_df = spark.createDataFrame(
[
("123","5-28-2020"),
("456","4-27-2020"),
("456","4-25-2020"),
("456","4-30-2020"),
("456","1-30-2020"),
("123","1-28-2020")],
("user_id","event_date"))
events_df.show()
+-------+----------+
|user_id|event_date|
+-------+----------+
| 123| 5-28-2020|
| 456| 4-27-2020|
| 456| 4-25-2020|
| 456| 4-30-2020|
| 456| 1-30-2020|
| 123| 1-28-2020|
+-------+----------+
users_df = spark.createDataFrame(
[
("123","4-28-2020"),
("456","4-27-2020")],
("user_id","activity_date"))
users_df.show()
+-------+-------------+
|user_id|activity_date|
+-------+-------------+
| 123| 4-28-2020|
| 456| 4-27-2020|
+-------+-------------+
# Import functions
import pyspark.sql.functions as f
# Join both data frames on user_id
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date'])
df.show()
+-------+----------+-------------+
|user_id|event_date|activity_date|
+-------+----------+-------------+
| 123| 5-28-2020| 4-28-2020|
| 456| 4-27-2020| 4-27-2020|
| 456| 4-25-2020| 4-27-2020|
| 456| 4-30-2020| 4-27-2020|
| 456| 1-30-2020| 4-27-2020|
| 123| 1-28-2020| 4-28-2020|
+-------+----------+-------------+
# find if event_date greater than activity_date if yes then assign zero else 1
df1 = df.withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))
df1.show()
+-------+----------+-------------+------+
|user_id|event_date|activity_date|active|
+-------+----------+-------------+------+
| 123| 5-28-2020| 4-28-2020| 0|
| 456| 4-27-2020| 4-27-2020| 1|
| 456| 4-25-2020| 4-27-2020| 1|
| 456| 4-30-2020| 4-27-2020| 0|
| 456| 1-30-2020| 4-27-2020| 1|
| 123| 1-28-2020| 4-28-2020| 1|
+-------+----------+-------------+------+
# then group by and sum
df2 = df1.groupby("user_id").agg(f.sum('active').alias('total'))
df2.show()
+-------+-----+
|user_id|total|
+-------+-----+
| 456| 3|
| 123| 1|
+-------+-----+
我已经添加了一个broadcast
连接,正如你所说的,一个表有million
记录,另一个有tens of millions
记录。
你也可以像下面这样组合df
和df1
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date']).withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))
【讨论】:
以上是关于如何编写 pyspark map-reduce 来计算日期之前的事件数的主要内容,如果未能解决你的问题,请参考以下文章