PySpark DataFrame的逐行聚合
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark DataFrame的逐行聚合相关的知识,希望对你有一定的参考价值。
我有一个Pyspark DataFrame,我想使用一个逐行操作的函数进行聚合。
我有4列,对于A列中的每个唯一值,我必须在B,C,D列中进行逐行聚合
我正在使用这种方法:
- 在A中获取唯一值
A_uniques = df.select('A').distinct()
def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) for i in y.shape[0]: y[i,1] = y[i-1,0] y[i,0] = (y[i,0]+y[i,2])/y[i,3] agg = sum(y[:,1]) return agg
A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))
我收到此错误:
PicklingError:无法序列化对象:Py4JError:调用o64.getnewargs时发生错误。跟踪:py4j.Py4JException:方法getnewargs([])在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)的py4j.Gateway上不存在。在py4j.commands.CallCommand.exe执行(CallCommand.java:79)py4j.GatewayConnection.run(GatewayConnection.java:214)的py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上调用(Gateway.java:272) )在java.lang.Thread.run(Thread.java:748)
有没有在RDD中保存numpy数组的解决方案?或者我可以用其他方式完成整个操作吗?
在Pyspark中,使用groupBy()(在我的情况下,我用2个cols分组)函数来获取GroupedDataFrame并管道agg()函数。见下面的例子......
sqlContext.sql("select * from retail_db.orders").groupBy("order_status", "order_date").agg({"order_customer_id": "sum", "order_id": "count"}).show()
+---------------+--------------------+----------------------+---------------+
| order_status| order_date|sum(order_customer_id)|count(order_id)|
+---------------+--------------------+----------------------+---------------+
|PENDING_PAYMENT|2013-07-28 00:00:...| 237876| 37|
| COMPLETE|2013-08-22 00:00:...| 415843| 64|
|PENDING_PAYMENT|2013-10-20 00:00:...| 168223| 28|
|SUSPECTED_FRAUD|2013-11-22 00:00:...| 36354| 6|
|PENDING_PAYMENT|2013-12-19 00:00:...| 131972| 22|
|PENDING_PAYMENT|2014-03-12 00:00:...| 352832| 52|
| ON_HOLD|2014-03-28 00:00:...| 74970| 13|
|SUSPECTED_FRAUD|2014-04-14 00:00:...| 18145| 2|
| PENDING|2014-04-21 00:00:...| 174419| 26|
| CLOSED|2014-06-04 00:00:...| 66677| 10|
|PENDING_PAYMENT|2014-06-26 00:00:...| 249542| 45|
|PENDING_PAYMENT|2013-08-17 00:00:...| 405980| 56|
| CLOSED|2013-09-10 00:00:...| 164670| 23|
|SUSPECTED_FRAUD|2013-09-19 00:00:...| 26613| 4|
| PENDING|2013-09-26 00:00:...| 176547| 28|
| COMPLETE|2013-10-20 00:00:...| 314462| 54|
| CANCELED|2013-10-31 00:00:...| 36881| 6|
| PROCESSING|2013-11-09 00:00:...| 149164| 23|
| PAYMENT_REVIEW|2013-11-29 00:00:...| 17368| 3|
|SUSPECTED_FRAUD|2013-12-11 00:00:...| 45085| 7|
+---------------+--------------------+----------------------+---------------+
only showing top 20 rows
您还可以对GroupedDataFrame使用grouped_Series_Owner = x_gb["Owner"].apply(list)
.apply()函数,in this example我将聚合数据转换为列表并使用它们。
以上是关于PySpark DataFrame的逐行聚合的主要内容,如果未能解决你的问题,请参考以下文章
具有聚合唯一值的pyspark dataframe groupby [重复]
pyspark - 使用 RDD 进行聚合比 DataFrame 快得多
在 PySpark Dataframe 中结合旋转和分组聚合
C 语言文件操作 ( 配置文件读写 | 读取配置文件 | 函数接口形参 | 读取配置文件的逐行遍历操作 | 读取一行文本 | 查找字符 | 删除字符串前后空格 )