spark UDF Java 错误:方法 col([class java.util.ArrayList]) 不存在
Posted
技术标签:
【中文标题】spark UDF Java 错误:方法 col([class java.util.ArrayList]) 不存在【英文标题】:spark UDF Java Error: Method col([class java.util.ArrayList]) does not exist 【发布时间】:2016-08-31 16:18:14 【问题描述】:我有一个 python 字典:
fileClass = 'a1' : ['a','b','c','d'], 'b1':['a','e','d'], 'c1': ['a','c','d','f','g']
元组列表为:
C = [('a','b'), ('c','d'),('e')]
我想最终创建一个火花数据框:
Name (a,b) (c,d) (e)
a1 2 2 0
b1 1 1 1
c1 1 2 0
它只包含出现在dict A中每个项目中的每个元组中元素的计数 为此,我创建了一个 dict 来将每个元素映射到 col 索引
classLoc = 'a':0,'b':0,'c':1,'d':1,'e':2
那我用udf来定义
import numpy as np
def convertDictToDF(v, classLoc, length) :
R = np.zeros((1,length))
for c in v:
try:
loc = classLoc[c]
R[loc] += 1
except:
pass
return(R)
udfConvertDictToDF = udf(convertDictToDF, ArrayType(IntegerType()))
df = sc.parallelize([
[k] + list(udfConvertDictToDF(v, classLoc, len(C)))
for k, v in fileClass.items()]).toDF(['Name']+ C)
然后我收到错误消息
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-40-ab668a12838a> in <module>()
1 df = sc.parallelize([
2 [k] + list(udfConvertDictToDF(v,classLoc, len(C)))
----> 3 for k, v in fileClass.items()]).toDF(['Name'] + C)
4
5 df.show()
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/functions.pyc in __call__(self, *cols)
1582 def __call__(self, *cols):
1583 sc = SparkContext._active_spark_context
-> 1584 jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
1585 return Column(jc)
1586
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _to_seq(sc, cols, converter)
58 """
59 if converter:
---> 60 cols = [converter(c) for c in cols]
61 return sc._jvm.PythonUtils.toSeq(cols)
62
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _to_java_column(col)
46 jcol = col._jc
47 else:
---> 48 jcol = _create_column_from_name(col)
49 return jcol
50
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/column.pyc in _create_column_from_name(name)
39 def _create_column_from_name(name):
40 sc = SparkContext._active_spark_context
---> 41 return sc._jvm.functions.col(name)
42
43
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/home/yizhng/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JError(
311 "An error occurred while calling 012. Trace:\n3\n".
--> 312 format(target_id, ".", name, value))
313 else:
314 raise Py4JError(
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:360)
at py4j.Gateway.invoke(Gateway.java:254)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
我不明白我的 UDF 有什么问题会导致该错误消息。请帮忙
【问题讨论】:
【参考方案1】:我认为这与你使用这条线的方式有关
[k] + list(udfConvertDictToDF(v, classLoc, len(C)))
在底部。
当我做一个简单的 python 版本时,我也会得到一个错误。
import numpy as np
C = [('a','b'), ('c','d'),('e')]
classLoc = 'a':0,'b':0,'c':1,'d':1,'e':2
import numpy as np
def convertDictToDF(v, classLoc, length) :
# I also got rid of (1,length) for (length)
# b/c pandas .from_dict() method handles this for me
R = np.zeros(length)
for c in v:
try:
loc = classLoc[c]
R[loc] += 1
except:
pass
return(R)
[[k] + convertDictToDF(v, classLoc, len(C))
for k, v in fileClass.items()]
产生这些错误
TypeError: ufunc 'add' did not contain a loop with signature matching types dtype('S32') dtype('S32') dtype('S32')
如果您要将列表理解更改为 dict 理解,您可以让它工作。
dict = k:convertDictToDF(v, classLoc, len(C))
for k, v in fileClass.items()
它的输出看起来像这样
> 'a1': array([ 2., 2., 0.]), 'c1': array([ 1., 2., 0.]), 'b1': array([ 1., 1., 1.])
在不知道您的最终用例是什么的情况下,我将让您获得您请求的输出,但使用稍微不同的方式,这可能无法按您的意愿扩展,所以我确定有更好的方法。
以下代码将为您提供数据框的其余部分,
import pandas as pd
df = pd.DataFrame.from_dict(data=dict,orient='index').sort_index()
df.columns=C
产生你想要的输出
(a, b) (c, d) e
a1 2.0 2.0 0.0
b1 1.0 1.0 1.0
c1 1.0 2.0 0.0
这将为您提供一个 Spark 数据帧
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df_s = sqlContext.createDataFrame(df)
df_s.show()
+----------+----------+---+
|('a', 'b')|('c', 'd')| e|
+----------+----------+---+
| 2.0| 2.0|0.0|
| 1.0| 1.0|1.0|
| 1.0| 2.0|0.0|
+----------+----------+---+
【讨论】:
以上是关于spark UDF Java 错误:方法 col([class java.util.ArrayList]) 不存在的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?
来自示例 Java 程序的 Spark UDF 反序列化错误
在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列