计算单个列中列表中值的实例
Posted
技术标签:
【中文标题】计算单个列中列表中值的实例【英文标题】:Counting instaces of values across lists within a single column 【发布时间】:2020-05-08 15:43:30 【问题描述】:我有一个 PySpark 数据框,其中 1 列由字符串列表组成。我想计算所有行中每个字符串列表中每个元素的实例数。伪代码:
counter = Counter()
for attr_list in df['attr_list']:
counter.update(attr_list)
另一种方法是连接所有行中的所有列表,并从单个巨大列表中构建一个计数器。在 PySpark 中是否有有效的方法来做到这一点?
正确的输出将是单个 collections.Counter()
对象,其中填充了所有列中所有列表中每个项目的出现次数,即,如果对于给定列,第 1 行有列表 ['a', 'b', 'c']
,第 2 行有列表['b', 'c', 'd']
,我们会得到一个看起来像'a': 1, 'b': 2, 'c': 2, 'd': 1
的计数器。
【问题讨论】:
你的 spark 版本是什么? 这能回答你的问题吗? Get the size/length of an array column 【参考方案1】:如果你知道你必须计算的 elements
,那么你可以将它与 spark2.4+.
一起使用,它会非常快。(使用 higher order function filter
和 structs
)
df.show()
#+------------+
#| atr_list|
#+------------+
#|[a, b, b, c]|
#| [b, c, d]|
#+------------+
elements=['a','b','c','d']
from pyspark.sql import functions as F
collected=df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x=))"\
.format("'"+y+"'"))).alias(y)) for y in elements]))\
.select(*[F.sum(F.col("struct..col1".format(x))).alias(x) for x in elements])\
.collect()[0]
elements[i]: [x for x in collected][i] for i in range(len(elements))
Out: 'a': 1, 'b': 3, 'c': 2, 'd': 1
第二种方法,使用transform, aggregate, explode and groupby
(不需要指定元素):
from pyspark.sql import functions as F
a=df.withColumn("atr", F.expr("""transform(array_distinct(atr_list),x->aggregate(atr_list,0,(acc,y)->\
IF(y=x, acc+1,acc)))"""))\
.withColumn("zip", F.explode(F.arrays_zip(F.array_distinct("atr_list"),("atr"))))\
.select("zip.*").withColumnRenamed("0","elements")\
.groupBy("elements").agg(F.sum("atr").alias("sum"))\
.collect()
a[i][0]: a[i][1] for i in range(len(a))
【讨论】:
谢谢。不幸的是,在这种情况下有超过 3000 个元素,这就是为什么我希望使用类似于 python Counter() dict 的东西。使用哈希表来增加仅使用给定行中的列表元素访问的变量应该比为每个元素迭代一次要快得多。 我不同意,上面的代码没有迭代,它在引擎盖下立即发生,并且没有快速的方法在 spark 中使用 counter() dict 之类的东西。您是否尝试过将所有 3000 个不同的元素放在上面的元素列表中并运行代码? 您好 Mohammad,感谢您的解决方案。我得到了一些非常有趣的结果。我只是通过在属性列上执行 rdd.collect() 然后运行列表来逐一更新计数器字典来做到这一点。执行此操作时,实际上是 rdd.collect() 花费的时间最多。您的解决方案要快得多,但是这两种解决方案的扩展方式非常不同。我测试了一个 10k 样本和一个 50k 样本。使用 counter dict 和 collect() 的方法对每个样本分别花费了 700 和 713 秒。你的方法分别用了 31 秒和 48 秒。谢谢! @theShmoo 没问题,感谢您的反馈,因为我对速度感到好奇。如果你有更大的数据,速度上的差异也会更大【参考方案2】:转换为 RDD 的一种选择是将所有数组合并为一个,然后在其上使用 Counter
对象。
from collections import Counter
all_lists = df.select('listCol').rdd
print(Counter(all_lists.map(lambda x: [i for i in x[0]]).reduce(lambda x,y: x+y)))
explode
和 groupBy
的另一个选项并将结果合并到 dictionary
。
from pyspark.sql.functions import explode
explode_df = df.withColumn('exploded_list',explode(df.listCol))
counts = explode_df.groupBy('exploded_list').count()
counts_tuple = counts.rdd.reduce(lambda a,b : a+b)
print(counts_tuple[i]:counts_tuple[i+1] for i in range(0,len(counts_tuple)-1,2))
【讨论】:
【参考方案3】:您可以尝试使用rdd的distinct
和flatMap
方法,这只需将列转换为rdd并进行这些操作。
counter = (df
.select("attr_list")
.rdd
# join all strings in the list and then split to get each word
.map(lambda x: " ".join(x).split(" "))
.flatMap(lambda x: x)
# make a tuple for each word so later it can be grouped by to get its frequency count
.map(lambda x: (x, 1))
.reduceByKey(lambda a,b: a+b)
.collectAsMap())
【讨论】:
以上是关于计算单个列中列表中值的实例的主要内容,如果未能解决你的问题,请参考以下文章