Pyspark - 使用广播字典中的日期过滤 RDD

Posted

技术标签:

【中文标题】Pyspark - 使用广播字典中的日期过滤 RDD【英文标题】:Pyspark - Filter RDD With Dates in Broadcast Dictionary 【发布时间】:2019-04-22 18:15:37 【问题描述】:

我有一个我广播的 python 字典,其中包含用户的日期过滤器。

nested_filter = "user1":"2018-02-15"
b_filter = sc.broadcast(nested_filter)

我想使用这个广播变量来过滤一个较大的RDD,其行数少于过滤日期。

rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])

rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()

但它返回一个空的 RDD。

有人可以指出我做错了什么吗?另外,我需要将字符串日期转换为日期对象吗?

正确的结果应该是:

[("user1","2018-02-05")]

【问题讨论】:

为什么广播变量是字典?为什么不只是一次约会?您想为每个用户使用不同的date 吗? 【参考方案1】:

观察b_filter.value.items() 在您的filter 调用中返回的值与以下内容相同:

nested_filter.items()
#[('user1', '2018-02-15')]

那么你的比较就变成了:

("user1","2018-02-05") < [('user1', '2018-02-15')]
#False

这是False。假设 nested_filter 是一个只有 1 项的字典(如此处所示),您可能打算与列表的第一个元素进行比较:

("user1","2018-02-05") < nested_filter.items()[0]
#True

所以要“修复”您的代码,您可以执行以下操作:

rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]

但是,我认为您真正想要的是以下内容:

rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]

这使用fields[0]nested_filter 获取日期(如果不存在则返回None)并将值与fields[1] 进行比较。

正如您所指出的,这种比较将在字符串上按字典顺序进行。如果您的日期仍为 YYYY-MM-DD 格式,这对您来说不是问题,但对于其他日期格式,您可能需要转换为 datetime 对象。

【讨论】:

以上是关于Pyspark - 使用广播字典中的日期过滤 RDD的主要内容,如果未能解决你的问题,请参考以下文章

过滤pySpark数据框中的日期列记录

Pyspark UDF 广播变量未定义仅在由单独脚本导入时

如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?

在 PySpark 中为镶木地板文件过滤日期时间范围和时区

PySpark:在日期为字符串的范围内按日期字段过滤DataFrame

PySpark 中具有多列的日期算术