Spark 创建 numpy 数组 RDD 的最快方法

Posted

技术标签:

【中文标题】Spark 创建 numpy 数组 RDD 的最快方法【英文标题】:Spark fastest way for creating RDD of numpy arrays 【发布时间】:2015-11-19 12:45:01 【问题描述】:

我的 Spark 应用程序正在使用 RDD 的 numpy 数组。 目前,我正在从 AWS S3 读取我的数据,它表示为 一个简单的文本文件,其中每一行都是一个向量,每个元素用空格分隔,例如:

1 2 3
5.1 3.6 2.1
3 0.24 1.333

我正在使用 numpy 的函数 loadtxt() 以便从中创建一个 numpy 数组。 但是,这种方法似乎很慢,而且我的应用程序花费了太多时间(我认为)将我的数据集转换为 numpy 数组。

你能建议我一个更好的方法吗?例如,我应该将我的数据集保存为二进制文件吗? 我应该以其他方式创建 RDD 吗?

我如何创建 RDD 的一些代码:

data = sc.textFile("s3_url", initial_num_of_partitions).mapPartitions(readData)

读取数据函数:

 def readPointBatch(iterator):
     return [(np.loadtxt(iterator,dtype=np.float64)]

【问题讨论】:

速度变慢的一个可能原因是您的文件未拆分。将您的输入剪切到 s3 上的多个文件中,并使用通配符在您的 url 中匹配它们。 @PaulK. 你是什么意思不分裂?如果我的文件是 file1.txt file2.txt 等等,那么 url 将是 s3n://bucket//file*.txt ? 你真的没有足够的使用来继续这里。是什么让您认为numpy 在这里真的是个问题?如果您读取文件并仅在空格上拆分 data = sc.textFile("s3_url").map(str.split) 会明显更快吗? @zero323 numpy 是个问题,因为它的 loadtxt 函数很慢。 【参考方案1】:

使用numpy.fromstring 简单地映射如下:

import numpy as np.

path = ...
initial_num_of_partitions = ...

data = (sc.textFile(path, initial_num_of_partitions)
   .map(lambda s: np.fromstring(s, dtype=np.float64, sep=" ")))

但忽略了您的方法并没有什么特别错误的地方。据我所知,使用基本配置,读取数据大约慢两倍,比创建虚拟 numpy 数组稍慢。

所以看起来问题出在其他地方。这可能是集群配置错误、从 S3 获取数据的成本,甚至是不切实际的期望。

【讨论】:

众所周知numpy的loadtxt很慢。与此相比,pandas 库中的 read_csv() 函数非常快。不幸的是,它不能接受 python 生成器作为输入,比如 np.loadtxt()。另外,我必须使用 mapPartitions 因为我查看每个分区的整个点,而不是逐点查看。问题是我只能通过生成器访问 txt 文件。 与运行任务的成本相比,它可以忽略不计,就像我上面提到的那样,至少在基本基准测试中,它最多比简单地初始化数组慢 30%。关于分区 - SparkContext.textFile 不会创建有意义的分区,因此任何仅查看分区的逻辑都是设计错误的。 如果我可以使用 pandas read_csv(),它会快 60%。 SparkContext.textFile 没有创建有意义的分区是什么意思?我只需要它在分区之间传播文本文件。 您为每个工作人员加载了多少数据?当前和预期的加载时间是多少? 我的意思是分区的内容会根据配置、来源和其他因素而有所不同。附带说明一下,可以为每个分区创建 pandas 数据帧。【参考方案2】:

在使用 Spark 时,您不应使用 numpy。 Spark 有自己的数据处理方法,可确保您有时非常大的文件不会立即加载到内存中,从而超出内存限制。你应该用 Spark 像这样加载你的文件:

data = sc.textFile("s3_url", initial_num_of_partitions) \
    .map(lambda row: map(lambda x: float(x), row.split(' ')))

现在这将根据您的示例输出一个像这样的RDD

>>> print(data.collect())
[[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]

@edit关于文件格式和numpy用法的一些建议:

文本文件与 CSV、TSV、Parquet 或任何您喜欢的文件一样好。根据关于二进制文件加载的 Spark 文档,二进制文件不是首选:

binaryFiles路径minPartitions=None

注意:实验性

从 HDFS、本地文件系统(在所有节点上可用)或任何 Hadoop 支持的文件系统 URI 作为字节数组读取二进制文件的目录。每个文件被读取为单个记录并以键值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。

注意:小文件是首选,大文件也可以,但可能会导致性能不佳。

至于numpy 的用法,如果我是你,我肯定会尝试用原生 Spark 替换任何外部包,例如 pyspark.mlib.random 用于随机化:http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.random

【讨论】:

我的 spark 算法正在使用 numpy 矩阵乘法和随机函数。用你的方式来表示向量对我来说会很不方便。另外,我问我是否应该将我的文件保存为文本文件,或者是否应该使用其他格式以获得更好的性能。谢谢 文本文件与 CSV、TSV、Parquet 或任何您喜欢的文件一样好。二进制文件不是首选,根据关于二进制文件加载的 Spark 文档:Note: Small files are preferred, large file is also allowable, but may cause bad performance. 至于 numpy 的用法,如果我是你,我肯定会尝试用原生 Spark 替换任何外部包,例如 pyspark.mlib.random 用于随机化:spark.apache.org/docs/latest/api/python/… 我的文件不是关键值。我知道使用本机 Spark 会更好,但目前除了使用 numpy 之外,我没有看到其他选择。那么保存代表向量的GB大小的文件的最佳方法是什么?你觉得txt文件会好吗? 嗨,Nhor,对我没用。我创建了一个简单的 npy 文件: matrix = np.array([[1.0, 2.0, 3.0], [5.1, 3.6, 2.1], [3.0, 0.24, 1.333]]) np.save("/tmp/测试”,矩阵),然后上传到 S3。现在,我正在尝试使用您的代码 sn-p 读取它并得到以下错误:UnicodeEncodeError: 'decimal' codec can't encode character u'\ufffd' in position 0: invalid decimal Unicode string。知道我做错了什么吗?【参考方案3】:

在这种情况下,最好的办法是使用 pandas library for io。 请参考这个问题:pandas read_csv() and python iterator as input . 在那里您将看到如何替换 np.loadtxt() 函数,以便创建一个 numpy 数组的 RDD。

【讨论】:

以上是关于Spark 创建 numpy 数组 RDD 的最快方法的主要内容,如果未能解决你的问题,请参考以下文章

spark总结5 RDD

Pyspark - 尝试迭代 numpy 数组时出错

spark

spark的rdd详解1

将 numpy 数组的 rdd 转换为 pyspark 数据帧

如何修改 Spark 数据框中的 numpy 数组?