Spark:内存繁重的连接操作的最佳实践
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark:内存繁重的连接操作的最佳实践相关的知识,希望对你有一定的参考价值。
我有一个火花程序,涉及大型Hive表的连接操作(数百万行,数百列)。在这些连接期间使用的内存非常高。我想了解在YARN上的Spark中处理这种情况的最佳方法,即作业将成功完成而不会出现内存错误。该集群由7名工人组成,每个工作人员有110 GB的RAM和16个核心。请考虑以下scala代码:
object Model1Prep {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Modello1_Spark")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec")
val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
import hc.implicits._
hc.sql("SET hive.exec.compress.output=true")
hc.sql("SET parquet.compression=SNAPPY")
hc.sql("SET spark.sql.parquet.compression.codec=snappy")
// loading tables on dataframes
var tableA = hc.read.table("TA")
var tableB = hc.read.table("TB")
var tableC = hc.read.table("TC")
var tableD = hc.read.table("TD")
// registering tables
tableA.registerTempTable("TA")
tableB.registerTempTable("TB")
tableC.registerTempTable("TC")
tableD.registerTempTable("TD")
var join1 = hc.sql("""
SELECT
[many fields]
FROM TA a
JOIN TB b ON a.field = b.field
LEFT JOIN TC c ON a.field = c.field
WHERE [conditions]
""")
var join2 = hc.sql("""
SELECT
[many fields]
FROM TA a
LEFT JOIN TD d ON a.field = d.field
WHERE [conditions]
""")
// [other operations]
sc.close()
}
}
考虑到连接操作在内存上非常昂贵,我最好的选择是什么?我知道数据帧可以在内存和磁盘上保留,可能使用序列化在内存中更紧凑,代价是反序列化的处理时间更慢(更多关于here和here)。从上面的代码中,表TA在两个连接中都使用,因此保留它是有意义的:
//[...]
// persisting
tableA.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
// registering tables
tableA.registerTempTable("TA")
tableB.registerTempTable("TB")
tableC.registerTempTable("TC")
tableD.registerTempTable("TD")
//[...]
我是否也应该以同样的方式坚持其他表格?或者还有其他事情可以使这些代码顺利运行并完成吗?
答案
如果你知道你正在加入哪个字段并且它始终是相同的字段,那么,as this SO answer suggests,对连接的表使用相同的分区器。
以上是关于Spark:内存繁重的连接操作的最佳实践的主要内容,如果未能解决你的问题,请参考以下文章