PySpark:使用一列索引另一列(两列的udf?)
Posted
技术标签:
【中文标题】PySpark:使用一列索引另一列(两列的udf?)【英文标题】:PySpark: use one column to index another (udf of two columns?) 【发布时间】:2017-02-14 02:33:23 【问题描述】:(2 月 14 日编辑)
假设我有一个具有以下架构的 Spark (PySpark) 数据框:
root
|-- myarray: array (nullable = true)
| |-- element: string (containsNull = true)
|-- myindices: array (nullable = true)
| |-- element: integer (containsNull = true)
看起来像:
+--------------------+----------+
| myarray | myindices|
+--------------------+----------+
| [A]| [0] |
| [B, C]| [1] |
| [D, E, F, G]| [0,2] |
+--------------------+----------+
如何使用第二个数组来索引第一个?
我的目标是创建一个新的数据框,如下所示:
+--------------------+----------+------+
| myarray | myindices|result|
+--------------------+----------+------+
| [A]| [0] | [A] |
| [B, C]| [1] | [C] |
| [D, E, F, G]| [0,2] | [D,F]|
+--------------------+----------+------+
(可以安全地假设myindices
的内容始终保证在相关行的myarray
的基数内,因此不存在越界问题。)
.getItem()
方法似乎只适用于单个参数,所以我可能需要一个 UDF,但我不知道如何创建一个包含多于一列作为输入的 UDF。有没有 UDF 的解决方案?
【问题讨论】:
df.withColumn('item', df['myarray'].getItem(df['myposition'])) @zhangtong 这应该是一个答案,而不是评论。 @zhangtong:谢谢;不幸的是,我的实际需求有点复杂。我已经编辑了这个问题以使其更清楚。能否请您看一下重新制定的问题,看看您是否有任何建议? @xenocyon 见下文 【参考方案1】:from pyspark.sql import functions as f
rdd = spark.sparkContext.parallelize([(['A'], [0]), (['B', 'C'], [1]), (['D', 'E', 'F'], [0, 2])])
df = spark.createDataFrame(rdd, ['myarray', 'myindices'])
my_UDF = f.UserDefinedFunction(lambda x, y: map(lambda z: x[z], y), returnType=ArrayType(StringType()))
res = df.withColumn('result', my_UDF(df['myarray'], df['myindices']))
res.show(truncate=False)
output:
+---------+---------+------+
|myarray |myindices|result|
+---------+---------+------+
|[A] |[0] |[A] |
|[B, C] |[1] |[C] |
|[D, E, F]|[0, 2] |[D, F]|
+---------+---------+------+
【讨论】:
谢谢,这很好用,是一个将两列作为参数的 UDF 的简洁示例。以上是关于PySpark:使用一列索引另一列(两列的udf?)的主要内容,如果未能解决你的问题,请参考以下文章