我对 Spark 中并行操作的理解是不是正确?

Posted

技术标签:

【中文标题】我对 Spark 中并行操作的理解是不是正确?【英文标题】:Is my understanding of parallel operations in Spark correct?我对 Spark 中并行操作的理解是否正确? 【发布时间】:2015-09-28 04:54:53 【问题描述】:

我是 Spark 的新手,正在尝试用 Python 理解 Spark 的概念。在使用 Python 为 Spark 开发应用程序时,我对以并行方式处理数据的方式有些困惑。

1。每个人都说我不需要担心在处理我封装在 RDD 变量中的数据时会涉及到哪个节点和多少个节点。因此,根据我的最佳理解,我相信 Spark 集群会对下面的代码做些什么:

a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
c = b.collect()

可以描述为以下步骤:

(1) 变量a 将保存为包含预期txt 文件内容的RDD 变量(2) 不同的RDD 块a将广播到集群中的不同节点,并对不同节点中的每个块进行过滤方法(3) 调用收集操作时,结果将从不同的节点并保存为局部变量,c

我的描述对吗?如果不是,具体的程序是什么?如果我是对的,那么并行化方法有什么意义?下面的代码是否与上面列出的相同?

a = sc.textFile(filename).collect()
b = sc.parallelize(a).filter(lambda x: len(x)>0 and x.split("\t").count("9999-12-31"))
c = b.collect()

2。对于下面的代码,SQL查询语法是否会通过将定义的表分成多个分区来并行处理?

a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
parts = b.map(lambda x: x.split("\t"))
records = parts.map(Row(r0 = str(x[0]), r1 = x[1], r2 = x[2]))
rTable = sqlContext.createDataFrame(records)
rTable.registerTempTable("rTable")
result = sqlContext.sql("select substr(r0,1,2), case when r1=1 then r1*100 else r1*10 end, r2 from rTable").collect()

【问题讨论】:

该代码对于初学者来说似乎相当不正确。 你能给我更多的提示吗?谢谢 您是否尝试过至少运行您的代码? 这些代码是从我的测试程序中粘贴的。无论如何,我只是再次测试了它们,它们都在工作。顺便说一下,这里我没有给出 sqlContext 的定义,它是通过语法 sqlContext=SQLContext(sc) 定义的。 据我所知,sc.textFile 方法会自动并行化您的数据,因此在这种情况下无需调用parallelize。当您想要并行化尚未并行化的事物时,您可以使用 parallelize 方法,例如普通的 List 或 Set。 【参考方案1】:

您的第一步描述是正确的。但第二步和第三步还有更多内容。

第二步:

根据 Spark 文档:

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

textFile 方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中的块默认为 64MB),但您也可以通过传递更大的值来请求更多的分区。

如果您将文件放入 HDFS 并将其路径作为 textFile 参数传递,则基于 HDFS 块创建 RDD a 的分区。所以在这种情况下,腭化的数量取决于 HDFS 块的数量。此外,数据已经分区并通过 HDFS 移动到集群机器。

如果您使用本地文件系统上的路径(适用于所有节点)并且未指定 minPartitions,则选择默认并行度(取决于集群中的核心数量)。在这种情况下,您必须将文件复制到每个工作人员身上或将其放入每个工作人员都可以使用的共享存储中。

在每种情况下,Spark 都避免广播任何数据,而是尝试使用每台机器中的现有块。所以你的第二步并不完全正确。

第三步

根据 Spark 文档:

collect(): 数组[T] 返回一个包含此 RDD 中所有元素的数组

在此步骤中,您的 RDD b 被洗牌/收集到您的驱动程序/节点中。

【讨论】:

感谢您的帮助!您知道 sqlContext.sql() 默认情况下是否并行进行?再次感谢!

以上是关于我对 Spark 中并行操作的理解是不是正确?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 中的并行集合

Spark 是不是并行执行 UnionAll?

在 spark 上使用集群和在本地使用并行操作有啥区别?

Spark——RDD算子

golang语言并发与并行——goroutine和channel的详细理解

有没有办法在分区的 spark 数据集上并行运行操作?