使用广播应用地图转换时,pyspark Udf 未按预期工作?

Posted

技术标签:

【中文标题】使用广播应用地图转换时,pyspark Udf 未按预期工作?【英文标题】:pyspark Udf is not working as expected when apply map transformation with broadcast? 【发布时间】:2018-12-07 14:28:29 【问题描述】:

我有如下两个列表

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]

我想从列表l 列表中删除不包含列表x 中任何tuples 中的所有元素的所有元素。换句话说,x 中应该至少有一个tuple,其所有元组项都存在于l 的元素中。

根据我的last question,我在python中得到了以下解决方案:

print([l_ for l_ in l if any(all(e in l_ for e in x_) for x_ in x)])

产生所需的输出:

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

现在我正在尝试使用 pyspark rdd 复制相同的操作,但我没有得到预期的结果。

这是我尝试过的:

rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(unique_product_List,x):
    filter_list = [
        l_ for l_ in unique_product_List 
        if any(all(e in l_ for e in x_) for x_ in x)
    ]

    return filter_list

rddsort=rddsort.map(lambda flist(x[0],broadcastVar.value)) 
print(rddsort.collect())

结果我得到一个空列表列表:

[[], [], [], [], [], []]

但是我的预期结果应该和上面一样。

【问题讨论】:

您不需要将广播变量传递给map 函数。通过广播它,它已经作为只读变量在每台机器上可用。 问题是您在 map 函数中对 unique_product_List 进行列表理解。你认为这是在迭代什么?这不是您所想的 rdd 中的行,而是每一行中的元素。 可以帮帮我。我该如何解决这个问题。 【参考方案1】:

您需要对 rdd 进行过滤(不是地图)。过滤器将检查每一行的条件并删除不匹配的条件。这里的条件是行值 (list _l = l[0]) 应该包含 x 中的一个列表中的所有元素。

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)

rddsort=rddsort.filter(lambda l_: any(all(e in l_ for e in x_) for x_ in x)) 
print(rddsort.collect())

输出

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

更新: 在函数中使用广播变量:

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(row):
    filter_flag = any(all(e in l_ for e in x_) for x_ in broadcastVar.value)
    return filter_flag

rddsort=rddsort.filter(flist) 
print(rddsort.collect())

【讨论】:

您好,感谢您的回复,但我想根据我的问题使用广播变量,请对此提供帮助。以上建议的解决方案我们没有使用。 更新了在函数中使用广播变量的答案

以上是关于使用广播应用地图转换时,pyspark Udf 未按预期工作?的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark UDF 中使用广播数据帧

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串

PySpark UDF 测试从 String 到 Int 的转换

pyspark中未定义的函数UDF?