将 udf 调用移动到新函数后的 azure pyspark udf 属性 nonetype

Posted

技术标签:

【中文标题】将 udf 调用移动到新函数后的 azure pyspark udf 属性 nonetype【英文标题】:azure pyspark udf attribute nonetype after moving udf call into new function 【发布时间】:2021-01-07 14:05:41 【问题描述】:

我从下面的一系列工作 udf 开始,以清理按预期工作的数据框中的列列表。

    填充数据框 执行 clean udf 1 ...
@f.udf(returnType=StringType())  
def ascii_ignore(x):
  return x.encode('ascii', 'ignore').decode('ascii')

def multi_remove_some_chars(col_names):
    def inner(df):
        for col_name in col_names:
            df = df.withColumn(
                col_name,
                ascii_ignore(col_name)
            )
        return df
    return inner

df = multi_remove_some_chars(colclean_list)(df)    

我的下一步是更接近一个类,因此我创建了一个新函数,该函数调用所有数据清理函数,如下所示:更改后第一个函数错误为“从 UDF 引发异常:'AttributeError: 'NoneType' 对象没有属性 'encode''" 就像 udf 没有接收到正在传递的 df 和列列表???不确定我是否理解为什么会这样。

def clean_names(sourceTable,sourceQuery,sourceId,sourceNameList):
  sourceQuery = sourceQuery +' ' +  sourceTable
  newTable = 'default.' + sourceTable + '_enhanced'  
  df = spark.sql(sourceQuery)

  colclean_list = []
  for col in sourceNameList:
    newname = col + "_m"
    df = df.withColumn(newname, f.lower(df[col]))
    colclean_list.append(newname)
  print(colclean_list)    
  df = multi_remove_some_chars(colclean_list)(df)
  df = multi_fix_abbreviations(colclean_list)(df) 
  #df = multi_remove_features(colclean_list)(df)   
  #df = multi_remove_stops(colclean_list)(df) 
  #df.write.option("mode","overwrite").save(newTable)
  #df.write.saveAsTable(newTable).mode("overwrite")
  df.show(500,False)

sourceTable='ndar_filtered'
sourceQuery='select NDARecID , FullName ,CorpOwnerName, OrgTypeCd from'
sourceId = 'NDARecID'
sourceNameList=['FullName','CorpOwnerName']
clean_names(sourceTable,sourceQuery,sourceId,sourceNameList)

【问题讨论】:

【参考方案1】:

传递给 UDF 的列中很可能存在空值。尝试在 UDF 中添加对 null 的检查:

@f.udf(returnType=StringType())  
def ascii_ignore(x):
  if x is not None:
    return x.encode('ascii', 'ignore').decode('ascii')
  else:
    return None

事实上,可能不需要 UDF。你可能会逍遥法外

df = df.withColumn(
    col_name,
    f.expr(f"replace(decode(encode(col_name, 'ascii'), 'ascii'), '?', '')")
)

【讨论】:

我已经尝试过了,但是当我去测试下一个更复杂的清理例程时,它们都是使用相同的模式编写的,在我使用函数调用移动语句后,它们都返回了类型错误。当顺序运行时,所有这些功能都可以正常工作。 @billSt3 你很幸运,当你按顺序运行它们时,没有空值。恐怕您需要更改清洁例程中的每个 UDF 以检查空值。

以上是关于将 udf 调用移动到新函数后的 azure pyspark udf 属性 nonetype的主要内容,如果未能解决你的问题,请参考以下文章

将正在运行的线程中的函数移动到新线程?

从Pyspark UDF调用另一个自定义Python函数

如何从 Azure 移动应用服务调用 HTTP(Azure Functions)?

调用SQL存储过程的C#函数在从本地计算机使用时有效,但在从云中的Azure函数调用时失败

如何将 Azure 流分析中的“类字典”结构转换为带有 javascript UDF 的多维数组?

启用 Azure Active Directory 身份验证时,Azure 移动应用将 Http POST 调用重定向到 GET