Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务
Posted
技术标签:
【中文标题】Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务【英文标题】:Pyspark function works properly by itself but does not perform the task when wrapped in a UDF 【发布时间】:2021-10-14 21:57:39 【问题描述】:我有这个函数,它接收代码并检查代码是否被使用(即:在 used_codes 字典中)。如果它没有被使用,那么它会吐出相同的代码,如果它已经被使用,那么它会生成一个新代码。然后我用所有唯一代码的这个新列“code_id”创建一个新的df。
我的函数本身可以正常工作,但是当它通过 udf 时它不执行任务。我的 used_codes 字典是空的,即使我有大量重复代码应该添加到 used 然后替换。
我不确定为什么它在包装在 UDF 之前有效,但在作为 UDF 运行时无效。
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
data = [("James", "36636"),
("Michael", "36636"),
("Robert", "42114"),
("Maria", "39192"),
("Jen", "39192")
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("id", StringType(), True), \
])
df = spark.createDataFrame(data=data,schema=schema)
used_codes =
def generate_random_code():
random_number = random.randint(10000,90000)
return random_number
def get_valid_code(code):
global used_codes
if(code != "" and code not in used_codes.keys()):
used_codes[code] = 1
return code
new_code = generate_random_code()
while (new_code in used_codes.keys()):
new_code = generate_random_code()
used_codes[new_code] = 2
return new_code
get_valid_code_udf = F.udf(lambda code: get_valid_code(code), T.StringType())
df = spark.createDataFrame(data=data,schema=schema)
new_df = df.withColumn("code_id", get_valid_code_udf('id'))
df.show()
+---------+-----+
|firstname| id|
+---------+-----+
| James|36636|
| Michael|36636|
| Robert|42114|
| Maria|39192|
| Jen|39192|
+---------+-----+
>>> new_df.show()
+---------+-----+-------+
|firstname| id|code_id|
+---------+-----+-------+
| James|36636| 36636|
| Michael|36636| 63312|
| Robert|42114| 42114|
| Maria|39192| 39192|
| Jen|39192| 76399|
+---------+-----+-------+
【问题讨论】:
【参考方案1】:您在函数中使用了全局变量used_codes
。这个全局变量不在worker中存在,这可能就是为什么你的函数没有像UDF那样工作,即使它仍然在每个worker中运行良好。
【讨论】:
有解决办法吗? 你需要给我们一些示例输入和预期输出,这会有所帮助 我已经编辑了上面的内容 - 我创建了一个 df 作为示例,实际上该函数确实可以正常工作,它不是 UDF。它返回带有唯一代码的 new_df。我想问题在于我在 EMR 上运行它时。所以我现在的问题是,你认为这个问题可以通过多处理管理器解决吗?以上是关于Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务的主要内容,如果未能解决你的问题,请参考以下文章
IE 说 javascript 函数未定义,但在 Chrome 中可以正常工作
C# 包装器到 C++ 函数 - 运行一次,工作正常。运行两次,程序挂起
C# 包装器到 C++ 函数 - 运行一次,工作正常。运行两次,程序挂起