计算两列之间的不同集,同时使用 agg 函数 Pyspark Spark Session

Posted

技术标签:

【中文标题】计算两列之间的不同集,同时使用 agg 函数 Pyspark Spark Session【英文标题】:Count distinct sets between two columns, while using agg function Pyspark Spark Session 【发布时间】:2021-10-30 22:56:03 【问题描述】:

我想获取位置之间的唯一连接数,因此 a->b 和 b->a 应该算作一个。数据框包含时间戳和开始和结束位置名称。结果应显示一年中每天站点之间的独特连接。

import findspark
findspark.init('/home/[user_name]/spark-3.1.2-bin-hadoop3.2')
import pyspark
from pyspark.sql.functions import date_format, countDistinct, struct, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster1').getOrCreate()

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import to_timestamp
data2 = [
    ('2017-12-29 16:57:39.6540','2017-12-29 16:57:39.6540',"A","B"),
    ("2017-12-29 16:57:39.6540","2017-12-29 17:57:39.6540","B","A"),
    ("2017-12-29 16:57:39.6540","2017-12-29 19:57:39.6540","B","A"),
    ("2017-12-30 16:57:39.6540","2017-12-30 16:57:39.6540","C","A"),
    ("2017-12-30 16:57:39.6540","2017-12-30 17:57:39.6540","B","F"),
    ("2017-12-31 16:57:39.6540","2017-12-31 16:57:39.6540","C","A"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","A","C"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","B","C"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","A","B"),
  ]

schema = StructType([ \
    StructField("start",StringType(),True), \
    StructField("end",StringType(),True), \
    StructField("start_loc",StringType(),True), \
    StructField("end_loc", StringType(), True)
  ])
 
df2 = spark.createDataFrame(data=data2,schema=schema)
df2 = df2.withColumn("start_timestamp",to_timestamp("start"))
df2 = df2.withColumn("end_timestamp",to_timestamp("end"))
df2 = df2.drop("start", "end")
df2.printSchema()
df2.show(truncate=False)

df2_agg = df2.withColumn("date", date_format('start_timestamp', 'D'))\
.groupBy('date', 'start_loc','end_loc').agg(
    collect_list(struct(col('start_loc'), col('end_loc'))).alias("n_routes_sets"),
)
df2_agg.show()

结果如下:

,但结果应该是这样的:

date n_routes
365 3
364 2
363 1

下面一行是错误的。

collect_list(struct(col('start_loc'), col('end_loc'))).alias("n_routes_sets"),

【问题讨论】:

【参考方案1】:

按照以下修改您的行,并将 a,b 和 b,a 重新排序为 a,b,反之亦然:

from pyspark.sql.functions import date_format, countDistinct, collect_set, struct, col, when, size

...
...
df2 = df2.withColumn("sl2", when(df2['end_loc'] < df2['start_loc'],  df2['end_loc']).otherwise(df2['start_loc']) )
df2 = df2.withColumn("el2", when(df2['end_loc'] > df2['start_loc'],  df2['end_loc']).otherwise(df2['start_loc']) )
df2 = df2.drop("start_loc", "end_loc")

df2.printSchema()
df2.show(truncate=False)

df2_agg = df2.withColumn("date", date_format('start_timestamp', 'D'))\
.groupBy('date').agg(collect_set(struct(col('sl2'), col('el2'))).alias("n_routes_sets"),
) 

df2_agg.select("date", size("n_routes_sets")).show()

返回:

+----+-------------------+
|date|size(n_routes_sets)|
+----+-------------------+
| 363|                  1|
| 364|                  2|
| 365|                  3|
+----+-------------------+

【讨论】:

没错,我可能因为它是 MRE 而错过了一些导入,但这不是我问题的答案。 我在你原来的databricvks笔记本上试过了,但它失败了,当我添加它们时它工作了。 添加了输出。 修改了答案。 对于这个特定的 MRE,这是一个非常好的答案,但我无法在我自己的任务上进行 agg 之前的这样的数据预处理。如果有一种方法可以省略这种预处理并包括过滤 agg 中的数据,那将是一个更好的答案。

以上是关于计算两列之间的不同集,同时使用 agg 函数 Pyspark Spark Session的主要内容,如果未能解决你的问题,请参考以下文章

pandas agg 和 apply 函数有啥区别?

Scala Spark groupBy/Agg 函数

Pandas实现groupby分组聚合后不同列数据统计

如何用Excel求两列数据之间的相关程度?

Pandas`agc`列表,“AttributeError / ValueError:函数不减少”

pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)