列上collect_list之后的PySpark reduceByKey聚合

Posted

技术标签:

【中文标题】列上collect_list之后的PySpark reduceByKey聚合【英文标题】:PySpark reduceByKey aggregation after collect_list on a column 【发布时间】:2017-11-23 09:43:19 【问题描述】:

我想根据collect_list收集的“状态”来做我的聚合。

示例代码:

states = sc.parallelize(["TX","TX","CA","TX","CA"])
states.map(lambda x:(x,1)).reduceByKey(operator.add).collect()
#printed output: [('TX', 3), ('CA', 2)]

我的代码:

from pyspark import SparkContext,SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import collect_list
import operator
conf = SparkConf().setMaster("local")
conf = conf.setAppName("test")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
rdd = sc.parallelize([('20170901',['TX','TX','CA','TX']), ('20170902', ['TX','CA','CA']), ('20170902',['TX']) ])
df = spark.createDataFrame(rdd, ["datatime", "actionlist"])
df = df.groupBy("datatime").agg(collect_list("actionlist").alias("actionlist"))

rdd = df.select("actionlist").rdd.map(lambda x:(x,1))#.reduceByKey(operator.add)
print (rdd.take(2))
#printed output: [(Row(actionlist=[['TX', 'CA', 'CA'], ['TX']]), 1 (Row(actionlist=[['TX', 'TX', 'CA', 'TX']]), 1)]
#for next step, it should look like:
#[(Row(actionlist=[('TX',1), ('CA',1), ('CA',1), ('TX',1)]), (Row(actionlist=[('TX',1), ('TX',1), ('CA',1), ('TX',1)])]

我想要的是这样的:

20170901,[('TX', 3), ('CA', 1 )]
20170902,[('TX', 2), ('CA', 2 )]

我认为第一步是展平 collect_list 结果,我尝试过: udf(lambda x: list(chain.from_iterable(x)), StringType()) udf(lambda items: list(chain.from_iterable(itertools.repeat(x,1) if isinstance(x,str) else x for x in items))) udf(lambda l: [子列表中的子列表项])

但还没有运气,下一步是化妆KV对并减少,我在这里停留了一段时间,任何火花专家可以帮助逻辑吗?感谢您的帮助!

【问题讨论】:

【参考方案1】:

你可以在 udf 中使用 reduce 和 counter 来实现。我试过了,希望对你有帮助。

>>> from functools import reduce
>>> from collections import Counter
>>> from pyspark.sql.types import *
>>> from pyspark.sql import functions as F
>>> rdd = sc.parallelize([('20170901',['TX','TX','CA','TX']), ('20170902', ['TX','CA','CA']), ('20170902',['TX']) ])
>>> df = spark.createDataFrame(rdd, ["datatime", "actionlist"])
>>> df = df.groupBy("datatime").agg(F.collect_list("actionlist").alias("actionlist"))
>>> def someudf(row):
        value = reduce(lambda x,y:x+y,row)
        return Counter(value).most_common()

>>> schema = ArrayType(StructType([
    StructField("char", StringType(), False),
    StructField("count", IntegerType(), False)]))

>>> udf1 = F.udf(someudf,schema)
>>> df.select('datatime',udf1(df.actionlist)).show(2,False)
+--------+-------------------+
|datatime|someudf(actionlist)|
+--------+-------------------+
|20170902|[[TX,2], [CA,2]]   |
|20170901|[[TX,3], [CA,1]]   |
+--------+-------------------+

【讨论】:

这个解决方案是完美的!你真的很好!下一步是从一个数据框列中获取 rdd 以处理实际需求。感谢您的帮助! 很高兴它有帮助! :)【参考方案2】:

你可以简单地使用 combineByKey():

from collections import Counter
count = rdd.combineByKey(lambda v: Counter(v),
                                 lambda c,v: c + Counter(v),
                                 lambda c1,c2: c1 + c2)
print count #[('20170901', Counter('TX': 3, 'CA': 1)), ('20170902', Counter('CA': 2, 'TX': 2))]

【讨论】:

以上是关于列上collect_list之后的PySpark reduceByKey聚合的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用 collect_list 时 Java 内存不足

Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序

pyspark:groupby 和聚合 avg 和 first 在多个列上

通过 Spark SQL 实现 `collect_list`

pyspark 在列上应用函数

在 PySpark 中的多个列上应用 MinMaxScaler