Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)
Posted
技术标签:
【中文标题】Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)【英文标题】:Pyspark: Aggregate data by de checking if value exist or not (not count or sum)Pyspark:通过检查值是否存在来聚合数据(不是计数或总和) 【发布时间】:2018-09-07 06:07:28 【问题描述】:我有一个这样的数据集,
test = spark.createDataFrame([
(0, 1, 5, "2018-06-03", "Region A"),
(1, 1, 2, "2018-06-04", "Region B"),
(2, 2, 1, "2018-06-03", "Region B"),
(4, 1, 1, "2018-06-05", "Region C"),
(5, 3, 2, "2018-06-03", "Region D"),
(6, 1, 2, "2018-06-03", "Region A"),
(7, 4, 4, "2018-06-03", "Region A"),
(8, 4, 4, "2018-06-03", "Region B"),
(9, 5, 4, "2018-06-03", "Region A"),
(10, 5, 4, "2018-06-03", "Region B"),
])\
.toDF("orderid", "customerid", "price", "transactiondate", "location")
test.show()
我可以像这样汇总每个地区每个客户的订单:
temp_result = test.groupBy("customerid").pivot("location").agg(count("orderid")).na.fill(0)
temp_result.show()
现在,不是sum
或count
,我想简单地通过确定值是否存在(即0 或1)来聚合数据,类似这样
我可以通过
得到上述结果for field in temp_result.schema.fields:
if str(field.name) not in ['customerid', "overall_count", "overall_amount"]:
name = str(field.name)
temp_result = temp_result.withColumn(name, \
when(col(name) >= 1, 1).otherwise(0))
但是有没有更简单的获取方式?
【问题讨论】:
【参考方案1】:您基本上已经完成了 - 只需稍作调整即可获得您想要的结果。在您的聚合中,添加计数比较并将布尔值转换为整数(如果需要):
temp_result = test.groupBy("customerid")\
.pivot("location")\
.agg((count("orderid")>0).cast("integer"))\
.na.fill(0)
temp_result.show()
结果成:
+----------+--------+--------+--------+--------+
|customerid|Region A|Region B|Region C|Region D|
+----------+--------+--------+--------+--------+
| 5| 1| 1| 0| 0|
| 1| 1| 1| 1| 0|
| 3| 0| 0| 0| 1|
| 2| 0| 1| 0| 0|
| 4| 1| 1| 0| 0|
+----------+--------+--------+--------+--------+
如果您遇到火花错误,您可以改用此解决方案,通过额外的步骤进行计数比较:
temp_result = test.groupBy("customerId", "location")\
.agg(count("orderid").alias("count"))\
.withColumn("count", (col("count")>0).cast("integer"))\
.groupby("customerId")\
.pivot("location")\
.agg(sum("count")).na.fill(0)
temp_result.show()
【讨论】:
0, 1 很重要,因为稍后我需要将其转换为矩阵并执行乘法。u"Aggregate expression required for pivot, found 'cast((count(orderid#469L) > cast(0 as bigint)) as int)';"
,这就是我得到的,我错过了什么吗?
@cqcn 你有什么spark版本?
它在数据块上,2.3.1
@cqcn1991 奇怪,我的版本相同,所以这应该不是问题。我在回答中提供了一个不同的解决方案,它应该对你有用,但不幸的是它更复杂。以上是关于Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)的主要内容,如果未能解决你的问题,请参考以下文章
与使用 Pyspark 的另一个表相比,检查数据框中是不是存在重复项 [重复]
如何检查 Pyspark Dataframe 中是不是存在列表交集
如何快速检查 PySpark Dataframe 中是不是存在行?