将udf应用于多列并使用numpy操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将udf应用于多列并使用numpy操作相关的知识,希望对你有一定的参考价值。
我在pyspark中有一个名为result的数据框,我想应用udf来创建新列,如下所示:
result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)]).withColumnRenamed("_1","count").withColumnRenamed("_2","df").withColumnRenamed("_3","docs")
@udf("float")
def newFunction(arr):
return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
result=result.withColumn("new_function_result",newFunction_udf(array("count","df","docs")))
列数,df,docs全部为整数列。但这返回
Py4JError:调用时发生错误Z:org.apache.spark.sql.functions.col。跟踪:py4j.Py4JException:方法col([class java.util.ArrayList])在以下位置不存在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)在py4j.Gateway.invoke(Gateway.java:274)处py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)处py4j.GatewayConnection.run(GatewayConnection.java:214)在java.lang.Thread.run(Thread.java:748)
[当我尝试通过一列并对其求平方时,它工作正常。
感谢您的任何帮助。
该错误消息具有误导性,但试图告诉您函数未返回浮点数。您的函数返回的类型为numpy.float64
的值,您可以使用VectorUDT类型(在下面的示例中为newFunctionVector
)来获取。使用numpy的另一种方法是将numpy类型numpy.float64
强制转换为python类型float(功能:在下面的示例中为newFunctionWithArray
)。
最后但并非最不重要的是,不必调用array,因为udfs可以使用多个参数(在下面的示例中,功能:newFunction
)。
import numpy as np
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import Vectors, VectorUDT
result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)], ["count","df","docs"])
def newFunctionVector(arr):
return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
@udf("float")
def newFunctionWithArray(arr):
returnValue = (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
return returnValue.item()
@udf("float")
def newFunction(count, df, docs):
returnValue = (1 + np.log(count)) * np.log(docs/df)
return returnValue.item()
vector_udf = udf(newFunctionVector, VectorUDT())
result=result.withColumn("new_function_result", newFunction("count","df","docs"))
result=result.withColumn("new_function_result_WithArray", newFunctionWithArray(array("count","df","docs")))
result=result.withColumn("new_function_result_Vector", newFunctionWithArray(array("count","df","docs")))
result.printSchema()
result.show()
输出:
root
|-- count: long (nullable = true)
|-- df: long (nullable = true)
|-- docs: long (nullable = true)
|-- new_function_result: float (nullable = true)
|-- new_function_result_WithArray: float (nullable = true)
|-- new_function_result_Vector: float (nullable = true)
+-----+---+----+-------------------+-----------------------------+--------------------------+
|count| df|docs|new_function_result|new_function_result_WithArray|new_function_result_Vector|
+-----+---+----+-------------------+-----------------------------+--------------------------+
| 138| 5| 10| 4.108459| 4.108459| 4.108459|
| 128| 4| 10| 5.362161| 5.362161| 5.362161|
| 112| 3| 10| 6.8849173| 6.8849173| 6.8849173|
| 120| 3| 10| 6.967983| 6.967983| 6.967983|
| 189| 1| 10| 14.372153| 14.372153| 14.372153|
+-----+---+----+-------------------+-----------------------------+--------------------------+
以上是关于将udf应用于多列并使用numpy操作的主要内容,如果未能解决你的问题,请参考以下文章