pyspark:计算列表中不同元素的出现次数
Posted
技术标签:
【中文标题】pyspark:计算列表中不同元素的出现次数【英文标题】:pyspark: count number of occurrences of distinct elements in lists 【发布时间】:2020-04-12 12:25:19 【问题描述】:我必须关注数据:
data = 'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
'flat': ['A;A;B', 'D;P;E;P;P', 'H;X', 'P;Q;G', 'S;T;U', 'G;C;G']
data['date'] = pd.to_datetime(data['date'])
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")
df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))
我想添加一个新列,其中包含每个不同元素的出现次数(按升序排序)和另一个包含最大值的列:
+-------------------+-----------+---------------------+-----------+----+
| date| flat | list |occurrences|max |
+-------------------+-----------+---------------------+-----------+----+
|2014-01-01 00:00:00|A;A;B |['A','A','B'] |[1,2] |2 |
|2014-01-02 00:00:00|D;P;E;P;P |['D','P','E','P','P']|[1,1,3] |3 |
|2014-01-03 00:00:00|H;X |['H','X'] |[1,1] |1 |
|2014-01-04 00:00:00|P;Q;G |['P','Q','G'] |[1,1,1] |1 |
|2014-01-05 00:00:00|S;T;U |['S','T','U'] |[1,1,1] |1 |
|2014-01-06 00:00:00|G;C;G |['G','C','G'] |[1,2] |2 |
+-------------------+-----------+---------------------+-----------+----+
非常感谢!
【问题讨论】:
列的顺序:出现次数对您来说重要吗? 【参考方案1】:对于 Spark2.4+
,这可以实现无需多个 groupBys 和聚合(因为它们在大数据中是昂贵的 shuffle 操作)。您可以使用 高阶函数 transform
和 aggregate
中的 one expression
来做到这一点。这应该是 spark2.4 的规范解决方案。
from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("occurances", F.expr("""array_sort(transform(array_distinct(list), x-> aggregate(list, 0,(acc,t)->acc+IF(t=x,1,0))))"""))\
.withColumn("max", F.array_max("occurances"))\
.show()
+-------------------+---------+---------------+----------+---+
| date| flat| list|occurances|max|
+-------------------+---------+---------------+----------+---+
|2014-01-01 00:00:00| A;A;B| [A, A, B]| [1, 2]| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| [1, 1, 3]| 3|
|2014-01-03 00:00:00| H;X| [H, X]| [1, 1]| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| [1, 1, 1]| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| [1, 1, 1]| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| [1, 2]| 2|
+-------------------+---------+---------------+----------+---+
【讨论】:
【参考方案2】:您可以通过几个 groupBy 语句来做到这一点,
首先你有一个这样的数据框,
+-------------------+---------+---------------+
| date| flat| list|
+-------------------+---------+---------------+
|2014-01-01 00:00:00| A;A;B| [A, A, B]|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|
|2014-01-03 00:00:00| H;X| [H, X]|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]|
|2014-01-05 00:00:00| S;T;U| [S, T, U]|
|2014-01-06 00:00:00| G;C;G| [G, C, G]|
+-------------------+---------+---------------+
像这样使用F.explode
分解list
列,
new_frame_exp = new_frame.withColumn("exp", F.explode('list'))
然后,您的数据框将如下所示,
+-------------------+---------+---------------+---+
| date| flat| list|exp|
+-------------------+---------+---------------+---+
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| B|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| D|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| E|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-03 00:00:00| H;X| [H, X]| H|
|2014-01-03 00:00:00| H;X| [H, X]| X|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| P|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| Q|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| G|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| S|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| T|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| U|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| C|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G|
+-------------------+---------+---------------+---+
在这个数据帧上,像这样做一个 groupBy,
new_frame_exp_agg = new_frame_exp.groupBy('date', 'flat', 'list', 'exp').count()
然后你就会有一个这样的数据框,
+-------------------+---------+---------------+---+-----+
| date| flat| list|exp|count|
+-------------------+---------+---------------+---+-----+
|2014-01-03 00:00:00| H;X| [H, X]| H| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| G| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| U| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| T| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| P| 1|
|2014-01-03 00:00:00| H;X| [H, X]| X| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| E| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| C| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| S| 1|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| B| 1|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| D| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| Q| 1|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P| 3|
+-------------------+---------+---------------+---+-----+
在这个数据帧上,再应用一层聚合来收集计数以列出并像这样找到最大值,
res = new_frame_exp_agg.groupBy('date', 'flat', 'list').agg(
F.collect_list('count').alias('occurances'),
F.max('count').alias('max'))
res.orderBy('date').show()
+-------------------+---------+---------------+----------+---+
| date| flat| list|occurances|max|
+-------------------+---------+---------------+----------+---+
|2014-01-01 00:00:00| A;A;B| [A, A, B]| [2, 1]| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| [1, 1, 3]| 3|
|2014-01-03 00:00:00| H;X| [H, X]| [1, 1]| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| [1, 1, 1]| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| [1, 1, 1]| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| [1, 2]| 2|
+-------------------+---------+---------------+----------+---+
如果您想要对列 occurance
进行排序,如果您使用的是 spark 2.4+,则可以在列上使用 F.array_sort
,否则您必须为此编写一个 udf。
【讨论】:
以上是关于pyspark:计算列表中不同元素的出现次数的主要内容,如果未能解决你的问题,请参考以下文章
Python中的列表元组切片增删改查#count:计算某元素出现次数找位置#index#reverse()反转#sort()