Pyspark - 使用广播字典中的日期过滤 RDD
Posted
技术标签:
【中文标题】Pyspark - 使用广播字典中的日期过滤 RDD【英文标题】:Pyspark - Filter RDD With Dates in Broadcast Dictionary 【发布时间】:2019-04-22 18:15:37 【问题描述】:我有一个我广播的 python 字典,其中包含用户的日期过滤器。
nested_filter = "user1":"2018-02-15"
b_filter = sc.broadcast(nested_filter)
我想使用这个广播变量来过滤一个较大的RDD,其行数少于过滤日期。
rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])
rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()
但它返回一个空的 RDD。
有人可以指出我做错了什么吗?另外,我需要将字符串日期转换为日期对象吗?
正确的结果应该是:
[("user1","2018-02-05")]
【问题讨论】:
为什么广播变量是字典?为什么不只是一次约会?您想为每个用户使用不同的date
吗?
【参考方案1】:
观察b_filter.value.items()
在您的filter
调用中返回的值与以下内容相同:
nested_filter.items()
#[('user1', '2018-02-15')]
那么你的比较就变成了:
("user1","2018-02-05") < [('user1', '2018-02-15')]
#False
这是False
。假设 nested_filter
是一个只有 1 项的字典(如此处所示),您可能打算与列表的第一个元素进行比较:
("user1","2018-02-05") < nested_filter.items()[0]
#True
所以要“修复”您的代码,您可以执行以下操作:
rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]
但是,我认为您真正想要的是以下内容:
rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]
这使用fields[0]
从nested_filter
获取日期(如果不存在则返回None
)并将值与fields[1]
进行比较。
正如您所指出的,这种比较将在字符串上按字典顺序进行。如果您的日期仍为 YYYY-MM-DD
格式,这对您来说不是问题,但对于其他日期格式,您可能需要转换为 datetime
对象。
【讨论】:
以上是关于Pyspark - 使用广播字典中的日期过滤 RDD的主要内容,如果未能解决你的问题,请参考以下文章
如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?