PySpark中pandas_udf的隐式模式?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark中pandas_udf的隐式模式?相关的知识,希望对你有一定的参考价值。
This answer很好地解释了如何使用pyspark的groupby和pandas_udf来进行自定义聚合。但是,我不可能手动声明我的架构,如示例的这一部分所示
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
因为我将返回100多个列,其中包含自动生成的名称。有没有办法告诉PySpark只是使用我的函数返回的Schema,并假设它对所有工作节点都是一样的?这个模式也会在运行期间发生变化,因为我将不得不使用我想要使用的预测变量,因此Schema生成的自动化过程可能是一个选项......
基于Sanxofons comment,我知道如何自己实现这个:
from pyspark.sql.types import *
mapping = {"float64": DoubleType,
"object":StringType,
"int64":IntegerType} # Incomplete - extend with your types.
def createUDFSchemaFromPandas(dfp):
column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema
我做的是获取一个示例pandas df,将其传递给函数,并查看返回的内容:
dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)
这似乎对我有用。问题是它是一种递归(需要定义函数来获取模式,将模式定义为udf)。我通过创建一个简单地传递数据帧的“包装器”UDF来解决这个问题。
不幸的是,没有这样的选择。在评估任何组件之前,必须静态地知道模式,因此基于实际数据的任何形式推断都不在表中。
如果内部流程以某种方式基于代码生成,那么您最好的选择是集成逻辑和模式生成。例如
def describe(cols, fun):
schema = StructType([StructField(c, DoubleType()) for c in cols])
@pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
def _(df):
return df[cols].agg([fun])
return _
df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))
df.groupBy("id").apply(describe(["x"], "mean")).show()
# +---+
# | x|
# +---+
# |3.0|
#+---+
df.groupBy("id").apply(describe(["x", "y"], "mean")).show()
# +---+---+
# | x| y|
# +---+---+
# |3.0|1.5|
# +---+---+
以上是关于PySpark中pandas_udf的隐式模式?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?
在 PySpark 中使用 pandas_udf 时无法填充数组
在 pyspark 中使用 pandas_udf 过滤数据框
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列
如何在 pyspark.sql.functions.pandas_udf 和 pyspark.sql.functions.udf 之间进行选择?