使用广播应用地图转换时,pyspark Udf 未按预期工作?
Posted
技术标签:
【中文标题】使用广播应用地图转换时,pyspark Udf 未按预期工作?【英文标题】:pyspark Udf is not working as expected when apply map transformation with broadcast? 【发布时间】:2018-12-07 14:28:29 【问题描述】:我有如下两个列表
l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
我想从列表l
列表中删除不包含列表x
中任何tuple
s 中的所有元素的所有元素。换句话说,x
中应该至少有一个tuple
,其所有元组项都存在于l
的元素中。
根据我的last question,我在python中得到了以下解决方案:
print([l_ for l_ in l if any(all(e in l_ for e in x_) for x_ in x)])
产生所需的输出:
[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]
现在我正在尝试使用 pyspark rdd
复制相同的操作,但我没有得到预期的结果。
这是我尝试过的:
rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)
def flist(unique_product_List,x):
filter_list = [
l_ for l_ in unique_product_List
if any(all(e in l_ for e in x_) for x_ in x)
]
return filter_list
rddsort=rddsort.map(lambda flist(x[0],broadcastVar.value))
print(rddsort.collect())
结果我得到一个空列表列表:
[[], [], [], [], [], []]
但是我的预期结果应该和上面一样。
【问题讨论】:
您不需要将广播变量传递给map
函数。通过广播它,它已经作为只读变量在每台机器上可用。
问题是您在 map
函数中对 unique_product_List
进行列表理解。你认为这是在迭代什么?这不是您所想的 rdd
中的行,而是每一行中的元素。
可以帮帮我。我该如何解决这个问题。
【参考方案1】:
您需要对 rdd 进行过滤(不是地图)。过滤器将检查每一行的条件并删除不匹配的条件。这里的条件是行值 (list _l = l[0]) 应该包含 x 中的一个列表中的所有元素。
l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)
rddsort=rddsort.filter(lambda l_: any(all(e in l_ for e in x_) for x_ in x))
print(rddsort.collect())
输出
[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]
更新: 在函数中使用广播变量:
l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)
def flist(row):
filter_flag = any(all(e in l_ for e in x_) for x_ in broadcastVar.value)
return filter_flag
rddsort=rddsort.filter(flist)
print(rddsort.collect())
【讨论】:
您好,感谢您的回复,但我想根据我的问题使用广播变量,请对此提供帮助。以上建议的解决方案我们没有使用。 更新了在函数中使用广播变量的答案以上是关于使用广播应用地图转换时,pyspark Udf 未按预期工作?的主要内容,如果未能解决你的问题,请参考以下文章
哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?
PySpark 将算法转换为 UDF 并将其应用于 DataFrame
Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串