PySpark:使用一列索引另一列(两列的udf?)

Posted

技术标签:

【中文标题】PySpark:使用一列索引另一列(两列的udf?)【英文标题】:PySpark: use one column to index another (udf of two columns?) 【发布时间】:2017-02-14 02:33:23 【问题描述】:

(2 月 14 日编辑)

假设我有一个具有以下架构的 Spark (PySpark) 数据框:

root
 |-- myarray: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- myindices: array (nullable = true)
 |    |-- element: integer (containsNull = true)

看起来像:

+--------------------+----------+
|          myarray   | myindices|
+--------------------+----------+
|                 [A]|    [0]   |
|              [B, C]|    [1]   |
|        [D, E, F, G]|   [0,2]  |
+--------------------+----------+

如何使用第二个数组来索引第一个?

我的目标是创建一个新的数据框,如下所示:

+--------------------+----------+------+
|          myarray   | myindices|result|
+--------------------+----------+------+
|                 [A]|    [0]   |  [A] |
|              [B, C]|    [1]   |  [C] |
|        [D, E, F, G]|   [0,2]  | [D,F]|
+--------------------+----------+------+

(可以安全地假设myindices 的内容始终保证在相关行的myarray 的基数内,因此不存在越界问题。)

.getItem() 方法似乎只适用于单个参数,所以我可能需要一个 UDF,但我不知道如何创建一个包含多于一列作为输入的 UDF。有没有 UDF 的解决方案?

【问题讨论】:

df.withColumn('item', df['myarray'].getItem(df['myposition'])) @zhangtong 这应该是一个答案,而不是评论。 @zhangtong:谢谢;不幸的是,我的实际需求有点复杂。我已经编辑了这个问题以使其更清楚。能否请您看一下重新制定的问题,看看您是否有任何建议? @xenocyon 见下文 【参考方案1】:
from pyspark.sql import functions as f

rdd = spark.sparkContext.parallelize([(['A'], [0]), (['B', 'C'], [1]), (['D', 'E', 'F'], [0, 2])])
df = spark.createDataFrame(rdd, ['myarray', 'myindices'])
my_UDF = f.UserDefinedFunction(lambda x, y: map(lambda z: x[z], y), returnType=ArrayType(StringType()))
res = df.withColumn('result', my_UDF(df['myarray'], df['myindices']))
res.show(truncate=False)

output:
+---------+---------+------+
|myarray  |myindices|result|
+---------+---------+------+
|[A]      |[0]      |[A]   |
|[B, C]   |[1]      |[C]   |
|[D, E, F]|[0, 2]   |[D, F]|
+---------+---------+------+

【讨论】:

谢谢,这很好用,是一个将两列作为参数的 UDF 的简洁示例。

以上是关于PySpark:使用一列索引另一列(两列的udf?)的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark将两列值组合到另一列?

使用 PySpark 连接与另一列中的两列确定的范围相匹配的数据框

响应式两列布局:在另一列之间移动一列

PySpark 传递列表到用户定义函数

Pyspark:如何根据另一列的值填充空值

Pyspark如何将一列与数据框中另一列的结果相乘?