如果 RDD 变大,Spark 将如何反应?
Posted
技术标签:
【中文标题】如果 RDD 变大,Spark 将如何反应?【英文标题】:How will Spark react if an RDD gets bigger? 【发布时间】:2019-03-11 03:15:08 【问题描述】:我们有代码在 Apache Spark 中运行。在对代码进行详细检查后,我确定我们的一个映射器正在修改 RDD 中的对象,而不是为输出制作对象的副本。也就是说,我们有一个 dicts 的 RDD,map 函数正在向字典中添加内容,而不是返回新字典。
RDD 应该是不可变的。我们的正在变异。
我们也有内存错误。
问题:如果 RDD 的大小突然增加,Spark 会不会感到困惑?
【问题讨论】:
你能显示代码吗? 添加/删除/修改元素是Spark RDD上很常见的操作,很容易改变RDD的大小,找不到Spark应该混淆的原因。 @allthenutsandbolts,代码非常复杂。我可以尝试提出一个最小的例子,但我们的最小例子不会导致 spark 崩溃。 @JiayiLiao,RDD 是不可变的。您可以使用地图将元素添加到 RDD。但是地图不应该修改它操作的RDD。例如,在 foreach() 文档中,它说“在 foreach() 之外修改累加器以外的变量可能会导致未定义的行为。” @vy32 你的意思是你已经开发了一个可变的RDD类,可以直接修改而不需要创建新的? 【参考方案1】:虽然它可能不会崩溃,但它可能会导致一些未指定的行为。比如这个sn-p
val rdd = sc.parallelize(
val m = new mutable.HashMap[Int, Int]
m.put(1, 2)
m
:: Nil)
rdd.cache() // comment out to change behaviour!
rdd.map(m =>
m.put(2, 3)
m
).collect().foreach(println) // "Map(2 -> 3, 1 -> 2)"
rdd.collect().foreach(println) // Either "Map(1 -> 2)" or "Map(2 -> 3, 1 -> 2)" depending if caching is used
行为会根据 RDD 是否被缓存而改变。在 Spark API 中有一堆函数可以改变数据,文档中明确指出了这一点,例如 https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/PairRDDFunctions.html#aggregateByKey-U-scala.Function2-scala.Function2-scala.reflect.ClassTag-
考虑使用 RDD[(K, V)]
的映射条目而不是映射,即 RDD[Map[K, V]]。这将允许使用flatMap
或mapPartitions
以标准方式添加新条目。如果需要,最终可以通过分组等方式生成地图表示。
【讨论】:
【参考方案2】:好的,我开发了一些代码来测试如果 RDD 中引用的对象被映射器突变会发生什么,我很高兴地报告如果您使用 Python 编程,这是不可能的。
这是我的测试程序:
from pyspark.sql import SparkSession
import time
COUNT = 5
def funnydir(i):
"""Return a directory for i"""
return "i":i,
"gen":0
def funnymap(d):
"""Take a directory and perform a funnymap"""
d['gen'] = d.get('gen',0) + 1
d['id' ] = id(d)
return d
if __name__=="__main__":
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
dfroot = sc.parallelize(range(COUNT)).map(funnydir)
dfroot.persist()
df1 = dfroot.map(funnymap)
df2 = df1.map(funnymap)
df3 = df2.map(funnymap)
df4 = df3.map(funnymap)
print("===========================================")
print("*** df1:",df1.collect())
print("*** df2:",df2.collect())
print("*** df3:",df3.collect())
print("*** df4:",df4.collect())
print("===========================================")
ef1 = dfroot.map(funnymap)
ef2 = ef1.map(funnymap)
ef3 = ef2.map(funnymap)
ef4 = ef3.map(funnymap)
print("*** ef1:",ef1.collect())
print("*** ef2:",ef2.collect())
print("*** ef3:",ef3.collect())
print("*** ef4:",ef4.collect())
如果您运行此程序,您会看到字典 d
的 ID 在每个数据帧中都不同。显然,当对象从映射器传递到映射器时,Spark 正在序列化反序列化对象。所以每个都有自己的版本。
如果这不是真的,那么第一次调用 funnymap
以创建 df1 也会更改 dfroot
数据帧中的代数,因此 ef4 将具有与 df4 不同的代数。
【讨论】:
以上是关于如果 RDD 变大,Spark 将如何反应?的主要内容,如果未能解决你的问题,请参考以下文章
如果存储在键中的值匹配,如何在 Spark 中合并两个 RDD