在 PySpark 中提取特定行

Posted

技术标签:

【中文标题】在 PySpark 中提取特定行【英文标题】:Extract specific rows in PySpark 【发布时间】:2019-04-09 14:21:06 【问题描述】:

我有一个这样的数据框

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

我只想提取那些只包含一个特定类型 - “A”的行(或 ID)

因此我的预期输出将包含以下行

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|    1|
|ID4|   A|   10|
+---+----+-----+

对于每个 ID 可以包含任何类型 - A、B、C 等。我想提取那些包含一个且仅一个类型的 ID - 'A'

如何在 PySpark 中实现这一点

【问题讨论】:

添加 group count column 并过滤计数等于 1 的位置。 这应该是 -from pyspark.sql.functions import col, when, collect_list, array_contains, size, first 然后df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),first(col('Value')).alias('Value'),collect_list('Type').alias('Type_Arr')) df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr') @cph_sto: 你的回答是正确的,能不能请你写这个作为回答,我会接受的 @Hardikgupta 刚刚做了,稍作修改以解决一个ID下多个As的问题。 【参考方案1】:

您可以对其应用过滤器。

import pyspark.sql.functions as f

data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)), 
       (("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()

+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1|   A|    1|
|ID1|   B|    5|
|ID2|   A|   12|
|ID3|   A|    3|
|ID3|   B|    3|
|ID3|   C|    5|
|ID4|   A|   10|
+---+----+-----+

x= df.filter(f.col('Type')=='A')

x.show()

如果我们需要过滤所有只有一条记录且类型为“A”的 ID,那么下面的代码可能是解决方案


df.registerTempTable('table1')


sqlContext.sql('select a.ID, a.Type,a.Value from table1 as a, (select ID, count(*) as cnt_val from table1 group by ID) b where a.ID = b.ID and (a.Type=="A" and b.cnt_val ==1)').show()


+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+


会有更好的替代方法来找到相同的。

【讨论】:

我收到一个错误keyword can't be an expression x = df.filter(f.col('Type')=='A')。它应该是两个 = 符号。更新了答案。 抱歉,这不是 OP 所要求的。您只是过滤掉其中包含 A 的行。其中,OP 要求那些 IDs 中只有 A AND 没有其他字母。 @cph_sto,哦,是的。更新了答案。感谢您强调【参考方案2】:

根据 OP 的要求,我正在记下我在 cmets 下写的答案。

当前问题的目的是过滤掉DataFrame,其中每个特定的ID 都只有Type A 的一个元素,而没有其他元素。

# Loading the requisite packages
from pyspark.sql.functions import col, collect_set, array_contains, size, first

我们的想法是先将aggregate() DataFrameID 组合在一起,由此我们将Type 的所有unique 元素使用collect_set() 分组到一个数组中。拥有unique 元素很重要,因为对于特定的ID 可能有两行,这两行的Type 都具有A。这就是我们应该使用collect_set() 而不是collect_list() 的原因,因为后者不会返回唯一元素,而是返回所有元素。

那么我们应该使用first() 来获取TypeValue 在一个组中的第一个值。如果A 是特定ID 唯一可能的unique Type,则first() 将返回A 的唯一值,以防A 出现一次,如果有则返回最大值A 的重复。

df = df = df.groupby(['ID']).agg(first(col('Type')).alias('Type'),
                                 first(col('Value')).alias('Value'),
                                 collect_set('Type').alias('Type_Arr'))
df.show()
+---+----+-----+---------+
| ID|Type|Value| Type_Arr|
+---+----+-----+---------+
|ID2|   A|   12|      [A]|
|ID3|   A|    3|[A, B, C]|
|ID1|   A|    1|   [A, B]|
|ID4|   A|   10|      [A]|
+---+----+-----+---------+

最后,我们将同时设置两个条件来过滤出所需的数据集。

条件 1: 它使用array_contains() 检查Type 数组中是否存在A

条件2:检查数组的size。如果大小大于1,则应该有多个Types

df = df.where(array_contains(col('Type_Arr'),'A') & (size(col('Type_Arr'))==1)).drop('Type_Arr')
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2|   A|   12|
|ID4|   A|   10|
+---+----+-----+

【讨论】:

巧妙利用first。【参考方案3】:

我不熟悉 Python,这里有一个 Scala 可能的解决方案:

df.groupBy("ID").agg(collect_set("Type").as("Types"))
  .select("ID").where((size($"Types")===1).and(array_contains($"Types", "A"))).show()
+---+
| ID|
+---+
|ID2|
|ID4|
+---+

这个想法是按ID 分组并仅过滤包含A 值的大小为1 的Types

【讨论】:

聚合应该是collect_set而不是collect_list,以防ID最终有多个"A"实例

以上是关于在 PySpark 中提取特定行的主要内容,如果未能解决你的问题,请参考以下文章

从 pyspark 数据框中的列中提取特定字符串

提取特定单元格的值并将其填充以代替 pyspark 数据框中的 NA 值

PySpark 根据特定列重新分区

PySpark - 根据条件填充特定行

从 PySpark 中的选定列和行中提取值

PYSPARK:根据条件用另一个行值更新一行中的值?