PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加
Posted
技术标签:
【中文标题】PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加【英文标题】:PySpark - Pass list as parameter to UDF + iterative dataframe column addition 【发布时间】:2018-08-09 13:22:44 【问题描述】:这个例子是从a link借来的!
我想了解为什么数据框a
- 在看似添加了列“category
”之后,无法在后续操作中引用。数据框a
在某种程度上是不可变的吗?是否有另一种方法可以对数据框a
进行操作,以便后续操作可以访问列'category
'?谢谢你的帮助;我还在学习曲线上。现在,可以一次添加所有列以避免错误,但这不是我想要在这里做的。
#sample data
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80),("E",0)],["Letter", "distances"])
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
#Passing List as Default value to a variable
def cate( feature_list,label=label_list):
if feature_list == 0:
return label[4]
else:
return 'I am not sure!'
def cate2( feature_list,label=label_list):
if feature_list == 0:
return label[4]
elif feature_list.category=='I am not sure!':
return 'Why not?'
udfcate = udf(cate, StringType())
udfcate2 = udf(cate2, StringType())
a.withColumn("category", udfcate("distances"))
a.show()
a.withColumn("category2", udfcate2("category")).show()
a.show()
我得到错误:
C:\Users\gowreden\AppData\Local\Continuum\anaconda3\python.exe C:/Users/gowreden/PycharmProjects/DRC/src/tester.py
2018-08-09 09:06:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+------+---------+--------------+
|Letter|distances| category|
+------+---------+--------------+
| A| 20|I am not sure!|
| B| 30|I am not sure!|
| D| 80|I am not sure!|
| E| 0| Dead|
+------+---------+--------------+
Traceback (most recent call last):
File "C:\Programs\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Programs\spark-2.3.1-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o34.withColumn.
: org.apache.spark.sql.AnalysisException: cannot resolve '`category`' given input columns: [Letter, distances];;
'Project [Letter#0, distances#1L, cate('category) AS category2#20]
+- AnalysisBarrier
+- LogicalRDD [Letter#0, distances#1L], false
....
【问题讨论】:
withColumn
不是就地操作,它返回一个新的数据框。你必须这样做a = a.withColumn("category", udfcate("distances"))
【参考方案1】:
我认为您的代码有两个问题:
首先,正如@pault 所说,withColumn
没有到位,您需要相应地修改您的代码。
其次,你的cate2
函数不正确。从某种意义上说,您将其应用于列category
,同时您请求将feature_list.category
与某些内容进行比较。
您可能希望摆脱第一个功能并执行以下操作:
import pyspark.sql.functions as F
a=a.withColumn('category', F.when(a.distances==0, label_list[4]).otherwise('I am not sure!'))
a.show()
输出:
+------+---------+--------------+
|Letter|distances| category|
+------+---------+--------------+
| A| 20|I am not sure!|
| B| 30|I am not sure!|
| D| 80|I am not sure!|
| E| 0| Dead|
+------+---------+--------------+
为第二个函数做这样的事情:
a=a.withColumn('category2', F.when(a.distances==0, label_list[4]).otherwise(F.when(a.category=='I am not sure!', 'Why not?')))
a.show()
输出:
+------+---------+--------------+---------+
|Letter|distances| category|category2|
+------+---------+--------------+---------+
| A| 20|I am not sure!| Why not?|
| B| 30|I am not sure!| Why not?|
| D| 80|I am not sure!| Why not?|
| E| 0| Dead| Dead|
+------+---------+--------------+---------+
【讨论】:
以上是关于PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 将 Dataframe 作为额外参数传递给映射