UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()
Posted
技术标签:
【中文标题】UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()【英文标题】:UnsupportedOperationException: Cannot evalute expression: .. when adding new column withColumn() and udf() 【发布时间】:2016-10-15 23:34:10 【问题描述】:所以我想做的只是转换字段:
year, month, day, hour, minute
(如下所示的整数类型)转换为字符串类型。
所以我有一个类型为 df_src 的数据框:
<class 'pyspark.sql.dataframe.DataFrame'>
这是它的架构:
root
|-- src_ip: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)
|-- minute: integer (nullable = true)
我之前也声明了一个函数:
def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "0:04d-1:02d-2:02d 3:02d:4:02d:5:02d".format(year, month, day, hour, minute, second)
我也做了一个测试,它就像一个魅力:
print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))
2016-10-15 21:00:00
<type 'str'>
所以我也用 udf 在 spark api 中做了类似的事情:
from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)
这个请求最终在哪里:
df_src.select('*',
u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
).show()
会导致:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
25 # Could not make this part wor..
26 df_src.select('*',
---> 27 u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
28 ).show()
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
...
Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
...
我尝试了很多东西,我尝试只用一个参数和参数调用方法......但没有帮助。
它确实起作用的一种方法是创建一个带有新列的新数据框,如下所示:
df_src_grp_hr_d = df_src.select('*', concat(
col("year"),
lit("-"),
col("month"),
lit("-"),
col("day"),
lit(" "),
col("hour"),
lit(":0")).alias('time'))`
之后我可以将列转换为时间戳:
df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'],
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))
【问题讨论】:
你确定这不仅仅是一个错字吗?查看您的错误消息:无法评估表达式:parse_df_to_stringg,请参阅额外的“g”? 良好的观察 paisanco。这只是我同时尝试的另一项测试,其函数只有一个参数。很抱歉误导你,异常依然存在。 哪个是正确的minute
或time
?
我想使用 udf parse_df_to_string
从列 year, month, day, hour, minute
生成新列 time
。
【参考方案1】:
好吧..我想我理解了这个问题...原因是因为我的 dataFrame 刚刚在内存中加载了大量数据导致 show()
操作失败。
我意识到这是导致异常的原因:
Py4JJavaError: An error occurred while calling o2108.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression:
确实是df.show()
操作。
我可以通过执行以下代码 sn-p 来确认: Convert pyspark string to date format
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType
# Creation of a dummy dataframe:
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"),
("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])
# Setting an user define function:
# This function converts the string cell into a date:
func = udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
df = df1.withColumn('test', func(col('first')))
df.show()
df.printSchema()
哪个有效!但它仍然不适用于我的数据帧df_src
。
原因是因为我从我的数据库服务器中加载了大量内存中的数据(例如超过 8-9 百万行),当@987654327 时,spark 似乎无法在 udf 中执行执行@(默认显示 20 个条目)加载到数据帧中的结果。
即使调用 show(n=1),也会抛出同样的异常。
但是如果调用 printSchema(),你会看到新列被有效地添加了。
查看是否添加了新列的一种方法是简单地调用操作print dataFrame.take(10)
。
最后,使其工作的一种方法是影响一个新的数据帧,而不是在 select() 中调用 udf 时调用.show()
:
df_to_string = df_src.select('*',
u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
)
然后缓存它:
df_to_string.cache
现在可以毫无问题地调用.show()
:
df_to_string.show()
【讨论】:
以上是关于UnsupportedOperationException:无法评估表达式:.. 添加新列 withColumn() 和 udf()的主要内容,如果未能解决你的问题,请参考以下文章