如何将UDF函数的返回值保存到两列?
Posted
技术标签:
【中文标题】如何将UDF函数的返回值保存到两列?【英文标题】:How to save the returned values of UDF function into two columns? 【发布时间】:2018-09-09 16:23:38 【问题描述】:我的函数get_data
返回一个元组:两个整数值。
get_data_udf = udf(lambda id: get_data(spark, id), (IntegerType(), IntegerType()))
我需要将它们分成两列val1
和val2
。我该怎么做?
dfnew = df \
.withColumn("val", get_data_udf(col("id")))
我是否应该将元组保存在列中,例如val
,然后以某种方式将其拆分为两列。或者有没有更短的方法?
【问题讨论】:
另见***.com/a/40962714/1138523 在scala中,你可以.withColumn("val1", col("val._1")).withColumn("val2", col("val._2"))
,不确定这是否适用于pyspark
【参考方案1】:
您可以在 udf 中创建 structFields 以便以后访问。
from pyspark.sql.types import *
get_data_udf = udf(lambda id: get_data(spark, id),
StructType([StructField('first', IntegerType()), StructField('second', IntegerType())]))
dfnew = df \
.withColumn("val", get_data_udf(col("id"))) \
.select('*', 'val.`first`'.alias('first'), 'val.`second`'.alias('second'))
【讨论】:
.select('*'
是什么意思?
表示所有列。
啊,好的。必须要做drop("val")
,对吧?
不确定。【参考方案2】:
元组可以像列表一样被索引,因此您可以将第一列的值添加为get_data()[0]
,并将第二列中的第二个值添加为get_data()[1]
您也可以使用v1, v2 = get_data()
,这样将返回的元组值分配给变量v1
和v2
。
在此处查看this 问题以获得进一步说明。
【讨论】:
如果我调用withColumn("val1", get_data_udf(col("id"))[0]).withColumn("val2", get_data_udf(col("id"))[1])
,那么我会调用get_data_udf
两次。不是吗?
另外,如果我在 DataFrame 中逐行运行此函数,我该如何运行 v1, v2 = get_data()
?
将第一个放在一个循环中,并将 v1 和 v2 逐行附加到您的 df - 这就是它的工作原理!
你能添加一些例子吗?但是循环通常不用于分布式编程。也许我误解了你的想法。因此,这个例子会很有帮助。
@Markus:如果您不想运行 udf 两次,则需要将结果临时保存在单独的列中。【参考方案3】:
例如,您有一个如下所示的一列示例数据框
val df = sc.parallelize(Seq(3)).toDF()
df.show()
//下面是一个UDF,它将返回一个元组
def tupleFunction(): (Int,Int) = (1,2)
//我们将从上面的UDF创建两个新列
df.withColumn("newCol",typedLit(tupleFunction.toString.replace("(","").replace(")","")
.split(","))).select((0 to 1)
.map(i => col("newCol").getItem(i).alias(s"newColFromTuple$i")):_*).show
【讨论】:
这是 scala,不是吗?我需要 Python。 没错。只有薄纱功能会有所不同。除此之外,实际代码是 spark api。它应该工作以上是关于如何将UDF函数的返回值保存到两列?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用scala将特定函数转换为apache spark中的udf函数? [复制]