从 Pyspark Dataframe 中提取 numpy 数组

Posted

技术标签:

【中文标题】从 Pyspark Dataframe 中提取 numpy 数组【英文标题】:extracting numpy array from Pyspark Dataframe 【发布时间】:2017-06-26 05:58:57 【问题描述】:

我有一个数据框 gi_man_df,其中 group 可以是 n

+------------------+-----------------+--------+--------------+
|           group  |           number|rand_int|   rand_double|
+------------------+-----------------+--------+--------------+
|          'GI_MAN'|                7|       3|         124.2|
|          'GI_MAN'|                7|      10|        121.15|
|          'GI_MAN'|                7|      11|         129.0|
|          'GI_MAN'|                7|      12|         125.0|
|          'GI_MAN'|                7|      13|         125.0|
|          'GI_MAN'|                7|      21|         127.0|
|          'GI_MAN'|                7|      22|         126.0|
+------------------+-----------------+--------+--------------+

我期待一个 numpy nd_array 即 gi_man_array:

[[[124.2],[121.15],[129.0],[125.0],[125.0],[127.0],[126.0]]]

应用数据透视后的 rand_double 值在哪里。

我尝试了以下 2 种方法:FIRST:我将 gi_man_df 旋转如下:

gi_man_pivot = gi_man_df.groupBy("number").pivot('rand_int').sum("rand_double")

我得到的输出是:

Row(number=7, group=u'GI_MAN', 3=124.2, 10=121.15, 11=129.0, 12=125.0, 13=125.0, 21=127.0, 23=126.0)

但这里的问题是要获得所需的输出,我无法将其转换为矩阵然后再次转换为 numpy 数组。

第二次: 我使用以下方法在数据框本身中创建了向量:

assembler = VectorAssembler(inputCols=["rand_double"],outputCol="rand_double_vector")

gi_man_vector = assembler.transform(gi_man_df)
gi_man_vector.show(7)

我得到以下输出:

+----------------+-----------------+--------+--------------+--------------+
|           group|           number|rand_int|   rand_double| rand_dbl_Vect|
+----------------+-----------------+--------+--------------+--------------+
|          GI_MAN|                7|       3|         124.2|       [124.2]|
|          GI_MAN|                7|      10|        121.15|      [121.15]|
|          GI_MAN|                7|      11|         129.0|       [129.0]|
|          GI_MAN|                7|      12|         125.0|       [125.0]|
|          GI_MAN|                7|      13|         125.0|       [125.0]|
|          GI_MAN|                7|      21|         127.0|       [127.0]|
|          GI_MAN|                7|      22|         126.0|       [126.0]|
+----------------+-----------------+--------+--------------+--------------+

但这里的问题是我无法在 rand_dbl_Vect 上旋转它。

所以我的问题是: 1. 这两种方法中的任何一种都是实现所需输出的正确方法,如果是,那么我该如何进一步获得所需的结果? 2. 我还有什么其他方法可以让代码最优且性能良好?

【问题讨论】:

我不在我的 spark 控制台,但你可以使用 .toArray() 方法吗? df.select('rand_dbl').toArray()。您的号码或 rand_int 都没有表明 groupby 有任何组可以工作以需要 groupby。 但是组可以是 n 种类型,例如 GI_MAN、LI_MAN 并且其他列的相应值会相应变化,我尝试使用 pivot 进行分组,它工作正常,你说的时候能详细说明一下吗“groupby 有任何需要使用 groupby 的组”,我不太明白 示例中你的数字向量全是 7。只有一组。那么为什么需要 groupby 呢? 我的回答对你有用吗?如果是,请批准。 群组可以有 n 种类型,所以我需要一个群组 【参考方案1】:

这个

import numpy as np
np.array(gi_man_df.select('rand_double').collect())

产生

array([[ 124.2 ],
       [ 121.15],
       .........])

【讨论】:

我不能使用 collect,因为当前的数据大小是 20TB,而且每个月都会增加约 5TB。因此,使用 collect 将不是一个可行的选择,因为它需要驱动程序上的大量内存。 还有其他方法可以做到这一点吗? @UdayShankarSingh 如果不在驱动程序的内存中,您想将这个 numpy 数组保存在哪里?我问是因为我正在做类似的事情,我能想到的唯一解决方案是批量处理数据帧,这些数据帧小到足以将结果数组保存在内存中。 @seth127 我也有同样的问题,你找到了一个优雅的解决方案吗?我很乐意避免重新发明一个常见问题的解决方案。 您找到解决方案了吗?数据帧可以拆分成块并异步处理吗?

以上是关于从 Pyspark Dataframe 中提取 numpy 数组的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中检索每组DataFrame中的前n个

从 Row 创建 DataFrame 会导致“推断架构问题”

如何比较pyspark中两个不同数据帧中的两列

在 zeppelin 中使用从 %pyspark 到 %python 的 Dataframe

PySpark,DataFrame 的顶部

pyspark建立RDD以及读取文件成dataframe