UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()

Posted

技术标签:

【中文标题】UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()【英文标题】:UnsupportedOperationException: Cannot evalute expression: .. when adding new column withColumn() and udf() 【发布时间】:2016-10-15 23:34:10 【问题描述】:

所以我想做的只是转换字段: year, month, day, hour, minute(如下所示的整数类型)转换为字符串类型。

所以我有一个类型为 df_src 的数据框:

<class 'pyspark.sql.dataframe.DataFrame'>

这是它的架构:

root
 |-- src_ip: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)

我之前也声明了一个函数:

def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "0:04d-1:02d-2:02d 3:02d:4:02d:5:02d".format(year, month, day, hour, minute, second)

我也做了一个测试,它就像一个魅力:

print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))

2016-10-15 21:00:00
<type 'str'>

所以我也用 udf 在 spark api 中做了类似的事情:

from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)

这个请求最终在哪里:

df_src.select('*', 
              u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
             ).show()

会导致:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
     25 # Could not make this part wor..
     26 df_src.select('*',
---> 27         u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
     28              ).show()

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    285         +---+-----+
    286         """
--> 287         print(self._jdf.showString(n, truncate))
    288 
    289     def __repr__(self):

/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
    ...


    Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)

...

我尝试了很多东西,我尝试只用一个参数和参数调用方法......但没有帮助。

它确实起作用的一种方法是创建一个带有新列的新数据框,如下所示:

df_src_grp_hr_d = df_src.select('*', concat(
    col("year"), 
    lit("-"), 
    col("month"), 
    lit("-"), 
    col("day"),
    lit(" "),
    col("hour"),
    lit(":0")).alias('time'))`

之后我可以将列转换为时间戳

df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'], 
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))

【问题讨论】:

你确定这不仅仅是一个错字吗?查看您的错误消息:无法评估表达式:parse_df_to_stringg,请参阅额外的“g”? 良好的观察 paisanco。这只是我同时尝试的另一项测试,其函数只有一个参数。很抱歉误导你,异常依然存在。 哪个是正确的minutetime 我想使用 udf parse_df_to_string 从列 year, month, day, hour, minute 生成新列 time 【参考方案1】:

好吧..我想我理解了这个问题...原因是因为我的 dataFrame 刚刚在内存中加载了大量数据导致 show() 操作失败。

我意识到这是导致异常的原因:

Py4JJavaError: An error occurred while calling o2108.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 

确实是df.show() 操作。

我可以通过执行以下代码 sn-p 来确认: Convert pyspark string to date format

from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType



# Creation of a dummy dataframe:
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"), 
                            ("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])

# Setting an user define function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())

df = df1.withColumn('test', func(col('first')))

df.show()

df.printSchema()

哪个有效!但它仍然不适用于我的数据帧df_src

原因是因为我从我的数据库服务器中加载了大量内存中的数据(例如超过 8-9 百万行),当@987654327 时,spark 似乎无法在 udf 中执行执行@(默认显示 20 个条目)加载到数据帧中的结果。

即使调用 show(n=1),也会抛出同样的异常。

但是如果调用 printSchema(),你会看到新列被有效地添加了。

查看是否添加了新列的一种方法是简单地调用操作print dataFrame.take(10)

最后,使其工作的一种方法是影响一个新的数据帧,而不是在 select() 中调用 udf 时调用.show()

df_to_string = df_src.select('*', 
          u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
         )

然后缓存它:

df_to_string.cache

现在可以毫无问题地调用.show()

df_to_string.show()

【讨论】:

以上是关于UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()的主要内容,如果未能解决你的问题,请参考以下文章