Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务

Posted

技术标签:

【中文标题】Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务【英文标题】:Pyspark function works properly by itself but does not perform the task when wrapped in a UDF 【发布时间】:2021-10-14 21:57:39 【问题描述】:

我有这个函数,它接收代码并检查代码是否被使用(即:在 used_codes 字典中)。如果它没有被使用,那么它会吐出相同的代码,如果它已经被使用,那么它会生成一个新代码。然后我用所有唯一代码的这个新列“code_id”创建一个新的df。

我的函数本身可以正常工作,但是当它通过 udf 时它不执行任务。我的 used_codes 字典是空的,即使我有大量重复代码应该添加到 used 然后替换。

我不确定为什么它在包装在 UDF 之前有效,但在作为 UDF 运行时无效。

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
import pyspark.sql.types as T
import random

data = [("James", "36636"),
    ("Michael", "36636"),
    ("Robert", "42114"),
    ("Maria", "39192"),
    ("Jen", "39192")
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("id", StringType(), True), \
  ])

df = spark.createDataFrame(data=data,schema=schema)

used_codes = 

def generate_random_code():
        random_number = random.randint(10000,90000)
        return random_number

def get_valid_code(code):
    global used_codes
    if(code != "" and code not in used_codes.keys()):
        used_codes[code] = 1 
        return code
    new_code = generate_random_code()
    while (new_code in used_codes.keys()):
        new_code = generate_random_code() 
    used_codes[new_code] = 2
    return new_code

get_valid_code_udf = F.udf(lambda code: get_valid_code(code), T.StringType())

df = spark.createDataFrame(data=data,schema=schema)

new_df = df.withColumn("code_id", get_valid_code_udf('id'))

df.show()
+---------+-----+                                                               
|firstname|   id|
+---------+-----+
|    James|36636|
|  Michael|36636|
|   Robert|42114|
|    Maria|39192|
|      Jen|39192|
+---------+-----+

>>> new_df.show()
+---------+-----+-------+
|firstname|   id|code_id|
+---------+-----+-------+
|    James|36636|  36636|
|  Michael|36636|  63312|
|   Robert|42114|  42114|
|    Maria|39192|  39192|
|      Jen|39192|  76399|
+---------+-----+-------+

【问题讨论】:

【参考方案1】:

您在函数中使用了全局变量used_codes。这个全局变量在worker中存在,这可能就是为什么你的函数没有像UDF那样工作,即使它仍然在每个worker中运行良好。

【讨论】:

有解决办法吗? 你需要给我们一些示例输入和预期输出,这会有所帮助 我已经编辑了上面的内容 - 我创建了一个 df 作为示例,实际上该函数确实可以正常工作,它不是 UDF。它返回带有唯一代码的 new_df。我想问题在于我在 EMR 上运行它时。所以我现在的问题是,你认为这个问题可以通过多处理管理器解决吗?

以上是关于Pyspark 函数本身可以正常工作,但在包装在 UDF 中时不执行任务的主要内容,如果未能解决你的问题,请参考以下文章

ScrollView 打破 .sheet 表示

IE 说 javascript 函数未定义,但在 Chrome 中可以正常工作

C# 包装器到 C++ 函数 - 运行一次,工作正常。运行两次,程序挂起

C# 包装器到 C++ 函数 - 运行一次,工作正常。运行两次,程序挂起

Docker 容器中的 Nginx 出现“连接重置”错误,但在没有容器的情况下工作正常

RegEx 在 JavaScript 中工作,但在 C# 中不工作