PySpark DataFrame的逐行聚合

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark DataFrame的逐行聚合相关的知识,希望对你有一定的参考价值。

我有一个Pyspark DataFrame,我想使用一个逐行操作的函数进行聚合。

我有4列,对于A列中的每个唯一值,我必须在B,C,D列中进行逐行聚合

我正在使用这种方法:

  1. 在A中获取唯一值 A_uniques = df.select('A').distinct()
  2. def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) for i in y.shape[0]: y[i,1] = y[i-1,0] y[i,0] = (y[i,0]+y[i,2])/y[i,3] agg = sum(y[:,1]) return agg
  3. A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))

我收到此错误:

PicklingError:无法序列化对象:Py4JError:调用o64.getnewargs时发生错误。跟踪:py4j.Py4JException:方法getnewargs([])在py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)的py4j.Gateway上不存在。在py4j.commands.CallCommand.exe执行(CallCommand.java:79)py4j.GatewayConnection.run(GatewayConnection.java:214)的py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)上调用(Gateway.java:272) )在java.lang.Thread.run(Thread.java:748)

有没有在RDD中保存numpy数组的解决方案?或者我可以用其他方式完成整个操作吗?

答案

在Pyspark中,使用groupBy()(在我的情况下,我用2个cols分组)函数来获取GroupedDataFrame并管道agg()函数。见下面的例子......

sqlContext.sql("select * from retail_db.orders").groupBy("order_status", "order_date").agg({"order_customer_id": "sum", "order_id": "count"}).show()

+---------------+--------------------+----------------------+---------------+
|   order_status|          order_date|sum(order_customer_id)|count(order_id)|
+---------------+--------------------+----------------------+---------------+
|PENDING_PAYMENT|2013-07-28 00:00:...|                237876|             37|
|       COMPLETE|2013-08-22 00:00:...|                415843|             64|
|PENDING_PAYMENT|2013-10-20 00:00:...|                168223|             28|
|SUSPECTED_FRAUD|2013-11-22 00:00:...|                 36354|              6|
|PENDING_PAYMENT|2013-12-19 00:00:...|                131972|             22|
|PENDING_PAYMENT|2014-03-12 00:00:...|                352832|             52|
|        ON_HOLD|2014-03-28 00:00:...|                 74970|             13|
|SUSPECTED_FRAUD|2014-04-14 00:00:...|                 18145|              2|
|        PENDING|2014-04-21 00:00:...|                174419|             26|
|         CLOSED|2014-06-04 00:00:...|                 66677|             10|
|PENDING_PAYMENT|2014-06-26 00:00:...|                249542|             45|
|PENDING_PAYMENT|2013-08-17 00:00:...|                405980|             56|
|         CLOSED|2013-09-10 00:00:...|                164670|             23|
|SUSPECTED_FRAUD|2013-09-19 00:00:...|                 26613|              4|
|        PENDING|2013-09-26 00:00:...|                176547|             28|
|       COMPLETE|2013-10-20 00:00:...|                314462|             54|
|       CANCELED|2013-10-31 00:00:...|                 36881|              6|
|     PROCESSING|2013-11-09 00:00:...|                149164|             23|
| PAYMENT_REVIEW|2013-11-29 00:00:...|                 17368|              3|
|SUSPECTED_FRAUD|2013-12-11 00:00:...|                 45085|              7|
+---------------+--------------------+----------------------+---------------+
only showing top 20 rows

您还可以对GroupedDataFrame使用grouped_Series_Owner = x_gb["Owner"].apply(list) .apply()函数,in this example我将聚合数据转换为列表并使用它们。

以上是关于PySpark DataFrame的逐行聚合的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中对 DataFrame 进行逐行操作

具有聚合唯一值的pyspark dataframe groupby [重复]

pyspark - 使用 RDD 进行聚合比 DataFrame 快得多

pySpark Dataframe 上聚合的多个标准

在 PySpark Dataframe 中结合旋转和分组聚合

C 语言文件操作 ( 配置文件读写 | 读取配置文件 | 函数接口形参 | 读取配置文件的逐行遍历操作 | 读取一行文本 | 查找字符 | 删除字符串前后空格 )