Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)

Posted

技术标签:

【中文标题】Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)【英文标题】:Pyspark: Aggregate data by de checking if value exist or not (not count or sum)Pyspark:通过检查值是否存在来聚合数据(不是计数或总和) 【发布时间】:2018-09-07 06:07:28 【问题描述】:

我有一个这样的数据集,

test = spark.createDataFrame([
    (0, 1, 5, "2018-06-03", "Region A"),
    (1, 1, 2, "2018-06-04", "Region B"),
    (2, 2, 1, "2018-06-03", "Region B"),
    (4, 1, 1, "2018-06-05", "Region C"),
    (5, 3, 2, "2018-06-03", "Region D"),
    (6, 1, 2, "2018-06-03", "Region A"),
    (7, 4, 4, "2018-06-03", "Region A"),
    (8, 4, 4, "2018-06-03", "Region B"),
    (9, 5, 4, "2018-06-03", "Region A"),
    (10, 5, 4, "2018-06-03", "Region B"),
])\
  .toDF("orderid", "customerid", "price", "transactiondate", "location")
test.show()

我可以像这样汇总每个地区每个客户的订单:

temp_result = test.groupBy("customerid").pivot("location").agg(count("orderid")).na.fill(0)
temp_result.show()

现在,不是sumcount,我想简单地通过确定值是否存在(即0 或1)来聚合数据,类似这样


我可以通过

得到上述结果
for field in temp_result.schema.fields:
    if str(field.name) not in ['customerid', "overall_count", "overall_amount"]:
        name = str(field.name)
        temp_result = temp_result.withColumn(name, \
                                             when(col(name) >= 1, 1).otherwise(0))

但是有没有更简单的获取方式?

【问题讨论】:

【参考方案1】:

您基本上已经完成了 - 只需稍作调整即可获得您想要的结果。在您的聚合中,添加计数比较并将布尔值转换为整数(如果需要):

temp_result = test.groupBy("customerid")\
                  .pivot("location")\
                  .agg((count("orderid")>0).cast("integer"))\
                  .na.fill(0)

temp_result.show()

结果成:

+----------+--------+--------+--------+--------+
|customerid|Region A|Region B|Region C|Region D|
+----------+--------+--------+--------+--------+
|         5|       1|       1|       0|       0|
|         1|       1|       1|       1|       0|
|         3|       0|       0|       0|       1|
|         2|       0|       1|       0|       0|
|         4|       1|       1|       0|       0|
+----------+--------+--------+--------+--------+

如果您遇到火花错误,您可以改用此解决方案,通过额外的步骤进行计数比较:

temp_result = test.groupBy("customerId", "location")\
                  .agg(count("orderid").alias("count"))\
                  .withColumn("count", (col("count")>0).cast("integer"))\
                  .groupby("customerId")\
                  .pivot("location")\
                  .agg(sum("count")).na.fill(0)

temp_result.show()

【讨论】:

0, 1 很重要,因为稍后我需要将其转换为矩阵并执行乘法。 u"Aggregate expression required for pivot, found 'cast((count(orderid#469L) > cast(0 as bigint)) as int)';",这就是我得到的,我错过了什么吗? @cqcn 你有什么spark版本? 它在数据块上,2.3.1 @cqcn1991 奇怪,我的版本相同,所以这应该不是问题。我在回答中提供了一个不同的解决方案,它应该对你有用,但不幸的是它更复杂。

以上是关于Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:检查数据框中是不是存在列[重复]

与使用 Pyspark 的另一个表相比,检查数据框中是不是存在重复项 [重复]

如何检查 Pyspark Dataframe 中是不是存在列表交集

如何快速检查 PySpark Dataframe 中是不是存在行?

在 pyspark sparksession 中检查 Hive 中是不是存在表

pyspark将列添加到列表中已经不存在的数据框