尝试通过数据框在 Pyspark 中执行用户定义的函数时出错

Posted

技术标签:

【中文标题】尝试通过数据框在 Pyspark 中执行用户定义的函数时出错【英文标题】:Error when trying to execute User Defined Functions in Pyspark over a Dataframe 【发布时间】:2017-07-05 12:56:55 【问题描述】:

我正在 Pyspark 中创建一个小程序,我想在其中生成一个使用的定义函数,以将 lambda 函数中的“method1”调用到“method0”。

我简化了 de 代码以便更好地理解,但核心功能是:对于数据帧中的每个实例,“method0”应用“method1”(在 lambda 函数的帮助下)根据正在检查的实例的值具有。这样,如果满足“method1”的第一个条件,则该实例的值应为“-”,否则应为“其他”。

通过这些操作,我们的想法是从该 UDF 中获取一列并将其附加到“method0”中的数据框。下面是修改后的代码,方便大家理解:

def method1(atr_list, instance, ident):

    if(instance.ATR1 != '-'):
        return instance.ATR1
    else:
        # Other operations ...
        return 'other'

def method0(df, atr_example_list, ident):

    udf_func = udf(lambda instance: method1(atr_example_list, instance, ident), returnType=StringType())
    new_column = udf_func(df)
    df = df.withColumnRenamed("New_Column", new_column)
    return df

result = method0(df, list, "1111")

但是当我执行这段代码时,我得到了下一个错误,我真的不知道为什么:

Py4JError: An error occurred while calling o298.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at 
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at 
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

这是我期望的输入和输出示例:

数据框'df':

+-------+-------+-------+
| ATR1  |  ATR2 | ATRN  |
+-------+-------+-------+
| '-'   |   1   |  'a'  |
| '-'   |   1   |  'a'  |
| '-'   |   2   |  'b'  | 
| '++'  |   1   |  'a'  |
+-------+-------+-------+

将数据帧“df”作为参数传递给“method0”(对于这个简化示例,无需查看参数“atr_example_list”和“ident”)我想在“method1”调用中获得这样的列:

+------------+
| new_column |
+------------+
|   'other'  |
|   'other'  |
|   'other'  |
|    '++'    |
+------------+

所以在方法 0 上,新的数据框将是:

+-------+-------+-------+------------+
| ATR1  |  ATR2 | ATRN  | new_column |
+-------+-------+-------+------------+
| '-'   |   1   |  'a'  |   'other'  |
| '-'   |   1   |  'a'  |   'other'  |
| '-'   |   2   |  'b'  |   'other'  | 
| '++'  |   1   |  'a'  |    '++'    |
+-------+-------+-------+------------+

谁能帮帮我?

【问题讨论】:

您能否添加一个输入数据样本以及您期望的输出? 我用 Duf59 说的例子编辑了这个问题。 【参考方案1】:

你不能像这样简化和使用单个 udf 吗(如果需要,method1 可以占用多列)? :

def method1(x):
  if x != "-":
    return x
  else:
    return 'other'

u_method1 = udf(method1, StringType())

result = df.withColumn("new_column", u_method1("ATR1"))

【讨论】:

以上是关于尝试通过数据框在 Pyspark 中执行用户定义的函数时出错的主要内容,如果未能解决你的问题,请参考以下文章

使用数据框在pyspark中获取列post group by

pyspark:数据框在另一个数据框的列中按ID选择行

数据框在多列上连接,pyspark中的列有一些条件[重复]

通过 Spark SQL 实现 `collect_list`

在 pyspark 中应用用户定义的聚合函数的替代方法

PySpark 中 JDBC 上的自定义分区