计算 pyspark df 列中子字符串列表的出现次数

Posted

技术标签:

【中文标题】计算 pyspark df 列中子字符串列表的出现次数【英文标题】:Count occurrences of a list of substrings in a pyspark df column 【发布时间】:2019-07-16 05:44:28 【问题描述】:

我想计算子字符串列表的出现次数,并根据 pyspark df 中包含长字符串的列创建一列。

Input:          
       ID    History

       1     USA|UK|IND|DEN|MAL|SWE|AUS
       2     USA|UK|PAK|NOR
       3     NOR|NZE
       4     IND|PAK|NOR

 lst=['USA','IND','DEN']


Output :
       ID    History                      Count

       1     USA|UK|IND|DEN|MAL|SWE|AUS    3
       2     USA|UK|PAK|NOR                1
       3     NOR|NZE                       0
       4     IND|PAK|NOR                   1

【问题讨论】:

【参考方案1】:
# Importing requisite packages and creating a DataFrame
from pyspark.sql.functions import split, col, size, regexp_replace
values = [(1,'USA|UK|IND|DEN|MAL|SWE|AUS'),(2,'USA|UK|PAK|NOR'),(3,'NOR|NZE'),(4,'IND|PAK|NOR')]
df = sqlContext.createDataFrame(values,['ID','History'])
df.show(truncate=False)
+---+--------------------------+
|ID |History                   |
+---+--------------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |
|3  |NOR|NZE                   |
|4  |IND|PAK|NOR               |
+---+--------------------------+

思路是根据这三个delimiters:lst=['USA','IND','DEN']对字符串进行拆分,然后统计产生的子字符串个数。

例如;字符串USA|UK|IND|DEN|MAL|SWE|AUS 被拆分为 - ,|UK|||MAL|SWE|AUS。因为,创建了 4 个子字符串并且有 3 个分隔符匹配,所以4-1 = 3 给出了这些字符串出现在列字符串中的计数。

我不确定 Spark 是否支持多字符分隔符,因此作为第一步,我们将列表 ['USA','IND','DEN'] 中的这 3 个子字符串中的任何一个替换为标志/虚拟值 %。你也可以用别的东西。以下代码执行此操作replacement -

df = df.withColumn('History_X',col('History'))
lst=['USA','IND','DEN']
for i in lst:
    df = df.withColumn('History_X', regexp_replace(col('History_X'), i, '%'))
df.show(truncate=False)
+---+--------------------------+--------------------+
|ID |History                   |History_X           |
+---+--------------------------+--------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|%|UK|%|%|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |%|UK|PAK|NOR        |
|3  |NOR|NZE                   |NOR|NZE             |
|4  |IND|PAK|NOR               |%|PAK|NOR           |
+---+--------------------------+--------------------+

最后,我们先统计splitting创建的子串数量,%作为分隔符,然后统计size函数创建的子串数量,最后减去1。

df = df.withColumn('Count', size(split(col('History_X'), "%")) - 1).drop('History_X')
df.show(truncate=False)
+---+--------------------------+-----+
|ID |History                   |Count|
+---+--------------------------+-----+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|3    |
|2  |USA|UK|PAK|NOR            |1    |
|3  |NOR|NZE                   |0    |
|4  |IND|PAK|NOR               |1    |
+---+--------------------------+-----+

【讨论】:

我收到一个错误:类“org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator”超过 64 KB 你使用的是哪个 spark 版本?【参考方案2】:

如果你使用的是Spark 2.4+,可以试试SPARK SQL高阶函数filter()

from pyspark.sql import functions as F

>>> df.show(5,0)
+---+--------------------------+
|ID |History                   |
+---+--------------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |
|3  |NOR|NZE                   |
|4  |IND|PAK|NOR               |
+---+--------------------------+

df_new = df.withColumn('data', F.split('History', '\|')) \
           .withColumn('cnt', F.expr('size(filter(data, x -> x in ("USA", "IND", "DEN")))'))

>>> df_new.show(5,0)
+---+--------------------------+----------------------------------+---+
|ID |History                   |data                              |cnt|
+---+--------------------------+----------------------------------+---+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|[USA, UK, IND, DEN, MAL, SWE, AUS]|3  |
|2  |USA|UK|PAK|NOR            |[USA, UK, PAK, NOR]               |1  |
|3  |NOR|NZE                   |[NOR, NZE]                        |0  |
|4  |IND|PAK|NOR               |[IND, PAK, NOR]                   |1  |
+---+--------------------------+----------------------------------+---+

在哪里我们首先将字段History拆分成一个名为data的数组列,然后使用过滤函数:

filter(data, x -> x in ("USA", "IND", "DEN"))

只检索满足条件的数组元素:IN ("USA", "IND", "DEN"),之后,我们用size()函数对结果数组进行计数。

更新:添加了另一种使用 array_contains() 的方法,它应该适用于旧版本的 Spark:

lst = ["USA", "IND", "DEN"]

df_new = df.withColumn('data', F.split('History', '\|')) \
           .withColumn('Count', sum([F.when(F.array_contains('data',e),1).otherwise(0) for e in lst]))

注意:数组中的重复条目将被跳过,此方法只计算唯一的国家代码。

【讨论】:

这会引发错误。ParseException: "\nextraneous input '>' @FalihaZikra 你的 Spark 是什么版本的,filter() 函数只有在 2.40+ 之后才可用 @FalihaZikra,我添加了另一种适用于旧版本 Spark 的方法,您可以测试它是否适合您。 更新版本有效。谢谢你。不幸的是,由于它不计算重复出现的次数,这可能不适用于我的用例。 @FalihaZikra,如果您可以将 Spark 升级到 2.4 版。你的任务会容易得多:)

以上是关于计算 pyspark df 列中子字符串列表的出现次数的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark数据框的列中使用正则表达式捕获两个字符串之间的第一次出现的字符串

Pyspark 将数组列分解为带有滑动窗口的子列表

是否可以使用 pyspark 过滤 Spark DataFrames 以返回列值在列表中的所有行?

在 PySpark 中使用字符数创建派生属性

在 pyspark 中添加带有压缩列表的新列作为常量值

PySpark / 计算出现次数并使用 UDF 创建新列