Pyspark UDF 广播变量未定义仅在由单独脚本导入时

Posted

技术标签:

【中文标题】Pyspark UDF 广播变量未定义仅在由单独脚本导入时【英文标题】:Pyspark UDF broadcasted variable undefined only when imported by separate script 【发布时间】:2017-03-07 14:03:17 【问题描述】:

这里有两个最小的工作示例脚本,它们都在 pyspark 中调用 UDF。 UDF 依赖于广播字典,它使用该字典将列映射到新列。产生正确输出的完整工作示例如下:

# default_sparkjob.py

from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrame
import pyspark.sql.functions as F

def _transform_df(sc, df):    
    global mapping
    mapping = 1:'First', 2:'Second', 3:'Third'
    mapping = sc.broadcast(mapping)

    udf_implement_map = F.udf(_implement_map, StringType())
    df = df.withColumn('Mapped', udf_implement_map('A'))
    return df

def _implement_map(column):
    return mapping.value[column]

if __name__ == "__main__":

    #_____________________________________________________________________________
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    #_____________________________________________________________________________

    import pandas as pd
    pd_df = pd.DataFrame.from_dict( 'A':[1,2,3], 'B':['a','b','c'] )
    sp_df = sqlContext.createDataFrame(pd_df)

    sp_df = _transform_df(sc, sp_df)
    sp_df.show()

# OUTPUT:
#+---+---+------+
#|  A|  B|Mapped|
#+---+---+------+
#|  1|  a| First|
#|  2|  b|Second|
#|  3|  c| Third|
#+---+---+------+

但是,如果在单独的脚本中导入并使用该函数,则表示未定义映射:

# calling_sparkjob.py

if __name__ == "__main__":

    #_____________________________________________________________________________
    from pyspark.sql.types import *
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, DataFrame
    import pyspark.sql.functions as F

    sc = SparkContext(pyFiles=['default_sparkjob.py'])
    sqlContext = SQLContext(sc)
    #_____________________________________________________________________________

    from default_sparkjob import _transform_df
    import pandas as pd
    pd_df = pd.DataFrame.from_dict( 'A':[1,2,3], 'B':['a','b','c'] )
    sp_df = sqlContext.createDataFrame(pd_df)

    sp_df = _transform_df(sc, sp_df)
    sp_df.show()

    # File "default_sparkjob.py", line 17, in _implement_map
    # return mapping.value[column]
    # NameError: global name 'mapping' is not defined

谁能解释一下为什么会这样?这是当前代码的真实版本中的主要障碍,该代码导入了许多依赖于来自外部文件的许多 udf 的函数。是否存在我不理解的命名空间问题?

非常感谢。

【问题讨论】:

试试这个def _implement_map(column): return globals()["mapping"].value[column] 嗨,Rakesh,感谢您的回复。该更改产生:return globals()["mapping"].value[column] KeyError: 'mapping' 在单独提交时在两个脚本中。 好的,你能不能把returnprint dir(), print locals(), print globals()前面的这三个东西都打印出来看看mapping有没有 我不确定我是否理解。我无法从 pyspark udf 中打印。你的意思是在调用 UDF 之前? 不,你应该试试def _implement_map(column): print globals() print dir() print locals() return mapping.value[column] 这将显示全局映射 【参考方案1】:

我也有同样的问题。当函数从其他文件导入时,程序会报错。

我不知道你现在是否有解决方案,但我找到了一个技巧解决方案

您可以将dict变量转换为字符串,然后在dataframe中添加一个值为F.lit(str)的新列,最后在udf中使用ast.literal_eval将str转换为dict并在udf中使用.

也许看代码会更清楚。

# default_sparkjob.py

import ast

from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrame
import pyspark.sql.functions as F

def _transform_df(sc, df):
    # global mapping
    mapping = 1:'First', 2:'Second', 3:'Third'
    # mapping = sc.broadcast(mapping)
    df = df.withColumn('mapping_config', F.lit(str(mapping)))

    udf_implement_map = F.udf(_implement_map, StringType())
    df = df.withColumn('Mapped', udf_implement_map('A', 'mapping_config'))
    return df

def _implement_map(column, mapping_config):
    mapping_ = ast.literal_eval(mapping_config)
    return mapping_[column]

然后使用您的 calling_sparkjob.py 来获得正确的结果。

+---+---+--------------------+------+
|  A|  B|      mapping_config|Mapped|
+---+---+--------------------+------+
|  1|  a|1: 'First', 2: '...| First|
|  2|  b|1: 'First', 2: '...|Second|
|  3|  c|1: 'First', 2: '...| Third|
+---+---+--------------------+------+

【讨论】:

以上是关于Pyspark UDF 广播变量未定义仅在由单独脚本导入时的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark UDF 中使用广播数据帧

Pyspark:如何仅在具有 NotNull 值的行上应用 UDF

使用广播应用地图转换时,pyspark Udf 未按预期工作?

我创建了一个文件来访问所有全局变量。我无法访问 pyspark-sql 查询中定义的 UDF 中的全局变量

udf(用户定义函数)如何在 pyspark 中工作?

pyspark 的用户定义函数 (UDF) 是不是需要单元测试?