如何编写 pyspark map-reduce 来计算日期之前的事件数

Posted

技术标签:

【中文标题】如何编写 pyspark map-reduce 来计算日期之前的事件数【英文标题】:How to write pyspark map-reduce to compute number of events prior to date 【发布时间】:2021-06-30 18:50:35 【问题描述】:

我有两张表,我们称它们为用户、事件

users: [user_id, activity_date]
       [123,     4-28-2020    ]
       [456,     4-27-2020    ]

events: [user_id, event_date]
       [123,     5-28-2020    ]
       [456,     4-27-2020    ]
       [456,     4-25-2020    ]
       [456,     4-30-2020    ]
       [456,     1-30-2020    ]
       [123,     1-28-2020    ]

我想要一个汇总表,显示每个用户的事件表中的事件计数,然后再存储在 users.activity_date 中的值。

所以上面的例子会产生:

[user_id, total]
[123,       1  ]
[456,       3  ]    

我尝试使用相关查询并在redshift上执行此操作,但它没有完成(第一个表中有数百万条记录,第二个有数千万条记录)......所以我的想法是使用map reduce。 ..但我不知道从哪里开始。我可以阅读 pyspark 中的表格,这就是我卡住的地方。

【问题讨论】:

您能否分享您的 Redshift SQL,因为我预计您会遇到问题。 1000 万行对于 Redshift 来说很小,因此如果您的查询未完成,则可能存在问题。 @user491880 如果答案解决了您的问题,请接受并投票赞成 【参考方案1】:

您只需要一个join,找出event_date 是否大于activity_datesum

# create data frames
events_df = spark.createDataFrame(
[
("123","5-28-2020"),
("456","4-27-2020"),
("456","4-25-2020"),
("456","4-30-2020"),
("456","1-30-2020"),
("123","1-28-2020")], 
("user_id","event_date"))

events_df.show()
+-------+----------+
|user_id|event_date|
+-------+----------+
|    123| 5-28-2020|
|    456| 4-27-2020|
|    456| 4-25-2020|
|    456| 4-30-2020|
|    456| 1-30-2020|
|    123| 1-28-2020|
+-------+----------+


users_df = spark.createDataFrame(
[
("123","4-28-2020"),
("456","4-27-2020")], 
("user_id","activity_date"))

users_df.show()
+-------+-------------+
|user_id|activity_date|
+-------+-------------+
|    123|    4-28-2020|
|    456|    4-27-2020|
+-------+-------------+

# Import functions
import pyspark.sql.functions as f

# Join both data frames on user_id
df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date'])

df.show()
+-------+----------+-------------+
|user_id|event_date|activity_date|
+-------+----------+-------------+
|    123| 5-28-2020|    4-28-2020|
|    456| 4-27-2020|    4-27-2020|
|    456| 4-25-2020|    4-27-2020|
|    456| 4-30-2020|    4-27-2020|
|    456| 1-30-2020|    4-27-2020|
|    123| 1-28-2020|    4-28-2020|
+-------+----------+-------------+

# find if event_date greater than activity_date if yes then assign zero else 1
df1 = df.withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))
df1.show()
+-------+----------+-------------+------+
|user_id|event_date|activity_date|active|
+-------+----------+-------------+------+
|    123| 5-28-2020|    4-28-2020|     0|
|    456| 4-27-2020|    4-27-2020|     1|
|    456| 4-25-2020|    4-27-2020|     1|
|    456| 4-30-2020|    4-27-2020|     0|
|    456| 1-30-2020|    4-27-2020|     1|
|    123| 1-28-2020|    4-28-2020|     1|
+-------+----------+-------------+------+

# then group by and sum
df2 = df1.groupby("user_id").agg(f.sum('active').alias('total'))
df2.show()
+-------+-----+
|user_id|total|
+-------+-----+
|    456|    3|
|    123|    1|
+-------+-----+

我已经添加了一个broadcast 连接,正如你所说的,一个表有million 记录,另一个有tens of millions 记录。

你也可以像下面这样组合dfdf1

df = events_df.join(f.broadcast(users_df), events_df.user_id == users_df.user_id, how='left_outer').select(events_df['user_id'], events_df['event_date'], users_df['activity_date']).withColumn('active', f.when(f.col('event_date') > f.col('activity_date'), 0).otherwise(f.lit(1)))

【讨论】:

以上是关于如何编写 pyspark map-reduce 来计算日期之前的事件数的主要内容,如果未能解决你的问题,请参考以下文章

如何编写“仅地图”hadoop 作业?

Pyspark:如何编写复杂的 Dataframe 计算

Pyspark:如何编写复杂的 Dataframe 算法问题(带条件求和)

pyspark如何使用两列编写UDF

如何在 PySpark 中编写条件正则表达式替换?

如何使用 Hive 上下文中的 Pyspark 调用用 Java 编写的 Hive UDF