PySpark / 计算出现次数并使用 UDF 创建新列

Posted

技术标签:

【中文标题】PySpark / 计算出现次数并使用 UDF 创建新列【英文标题】:PySpark / Count the number of occurrences and create a new column with UDF 【发布时间】:2020-06-04 07:24:20 【问题描述】:

我有一个包含几列的数据框,包括video_idtags

我需要在我的 df 中创建一个名为 occurrencias_music 的新列,其中字符串“music”的出现次数作为任何标签的子字符串。该标签不必与“音乐”完全相同,而是将其作为子字符串包含。

后来,想法是实现一个UDF subtag_music_UDF,它返回IntegerType(),并包装了传统的python函数subcadena_en_vector(tags)

from pyspark.sql import functions as F
from pyspark.sql import types as T

subtag_music_UDF = F.udf(subcadena_en_vector, T.IntegerType())
df = df.withColumn("ocurrencias_music", subtag_music_UDF(F.col("tags")))

为此,我需要一个名为subcadena_en_vector(tags) 的函数,它应该接收一个字符串列表作为参数,并检查向量中有多少元素包含“音乐”一词作为子字符串。我必须用这个列表来测试它的操作:

["a life in music", "music for life", "bso", "hans zimmer"]

结果 2.

我知道subcadena_en_vector(tags) 函数可能是什么:

def subcadena_en_vector(tags, strToSearch):
    nTimes = 0
    for item in tags:
        #print(item.split())
        for subitem in item.split():
            if subitem==strToSearch:
                nTimes += 1

    return nTimes

if __name__ == "__main__":
  tags = ["a life in music", "music for life", "bso", "hans zimmer"]
  palabra = 'music'
  print(cuenta(tags,palabra)

这个函数的问题在于,稍后在包含这个断言的更正部分中:

assert(subcadena_en_vector(["a life in music", "music for life", "bso", "hans zimmer"]) == 2)

我收到以下错误:

> TypeErrorTraceback (most recent call last)
> <ipython-input-3-7a51ae031d9e> in <module>()
> ----> 1 assert(subcadena_en_vector(["a life in music", "music for life", "bso", "hans zimmer"]) == 2) TypeError: subcadena_en_vector()
> takes exactly 2 arguments (1 given)

关于如何简化函数以便它计算出现次数并且不会出现参数错误的任何想法?

提前致谢。

【问题讨论】:

你需要一个字符串来匹配,为什么你在断言操作时不传递字符串? 【参考方案1】:

我终于通过这样做解决了这个问题:

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

subtag_music = ["a life in music", "music for life", "bso", "hans zimmer"]

def subcadena_en_vector(tags):
    return(sum([1 for c in tags if "music" in c]))

print(subcadena_en_vector(subtag_music))

subtag_music_UDF = F.udf(subcadena_en_vector, T.IntegerType())
videosOcurrenciasMusicDF = videosDiasViralDF.withColumn("ocurrencias_music", subtag_music_UDF(F.col("tags")))

谢谢!

【讨论】:

以上是关于PySpark / 计算出现次数并使用 UDF 创建新列的主要内容,如果未能解决你的问题,请参考以下文章

我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

pyspark:计算列表中不同元素的出现次数

计算 pyspark 数据框中的出现次数

PySpark 分组并逐行应用 UDF 操作

计算 PySpark SQL Join 中每个不同值在列中出现的次数