如果 RDD 变大,Spark 将如何反应?

Posted

技术标签:

【中文标题】如果 RDD 变大,Spark 将如何反应?【英文标题】:How will Spark react if an RDD gets bigger? 【发布时间】:2019-03-11 03:15:08 【问题描述】:

我们有代码在 Apache Spark 中运行。在对代码进行详细检查后,我确定我们的一个映射器正在修改 RDD 中的对象,而不是为输出制作对象的副本。也就是说,我们有一个 dicts 的 RDD,map 函数正在向字典中添加内容,而不是返回新字典。

RDD 应该是不可变的。我们的正在变异。

我们也有内存错误。

问题:如果 RDD 的大小突然增加,Spark 会不会感到困惑?

【问题讨论】:

你能显示代码吗? 添加/删除/修改元素是Spark RDD上很常见的操作,很容易改变RDD的大小,找不到Spark应该混淆的原因。 @allthenutsandbolts,代码非常复杂。我可以尝试提出一个最小的例子,但我们的最小例子不会导致 spark 崩溃。 @JiayiLiao,RDD 是不可变的。您可以使用地图将元素添加到 RDD。但是地图不应该修改它操作的RDD。例如,在 foreach() 文档中,它说“在 foreach() 之外修改累加器以外的变量可能会导致未定义的行为。” @vy32 你的意思是你已经开发了一个可变的RDD类,可以直接修改而不需要创建新的? 【参考方案1】:

虽然它可能不会崩溃,但它可能会导致一些未指定的行为。比如这个sn-p

val rdd = sc.parallelize(
    val m = new mutable.HashMap[Int, Int]
    m.put(1, 2)
    m
 :: Nil)
rdd.cache() // comment out to change behaviour!
rdd.map(m => 
    m.put(2, 3)
    m
).collect().foreach(println) // "Map(2 -> 3, 1 -> 2)"
rdd.collect().foreach(println) // Either "Map(1 -> 2)" or "Map(2 -> 3, 1 -> 2)" depending if caching is used

行为会根据 RDD 是否被缓存而改变。在 Spark API 中有一堆函数可以改变数据,文档中明确指出了这一点,例如 https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/PairRDDFunctions.html#aggregateByKey-U-scala.Function2-scala.Function2-scala.reflect.ClassTag-

考虑使用 RDD[(K, V)] 的映射条目而不是映射,即 RDD[Map[K, V]]。这将允许使用flatMapmapPartitions 以标准方式添加新条目。如果需要,最终可以通过分组等方式生成地图表示。

【讨论】:

【参考方案2】:

好的,我开发了一些代码来测试如果 RDD 中引用的对象被映射器突变会发生什么,我很高兴地报告如果您使用 Python 编程,这是不可能的。

这是我的测试程序:

from pyspark.sql import SparkSession

import time

COUNT = 5
def funnydir(i):
    """Return a directory for i"""
    return "i":i,
            "gen":0 

def funnymap(d):
    """Take a directory and perform a funnymap"""
    d['gen'] = d.get('gen',0) + 1
    d['id' ] = id(d)
    return d

if __name__=="__main__":
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext

    dfroot = sc.parallelize(range(COUNT)).map(funnydir)
    dfroot.persist()
    df1 = dfroot.map(funnymap)
    df2 = df1.map(funnymap)
    df3 = df2.map(funnymap)
    df4 = df3.map(funnymap)



    print("===========================================")
    print("*** df1:",df1.collect())
    print("*** df2:",df2.collect())
    print("*** df3:",df3.collect())
    print("*** df4:",df4.collect())
    print("===========================================")

    ef1 = dfroot.map(funnymap)
    ef2 = ef1.map(funnymap)
    ef3 = ef2.map(funnymap)
    ef4 = ef3.map(funnymap)
    print("*** ef1:",ef1.collect())
    print("*** ef2:",ef2.collect())
    print("*** ef3:",ef3.collect())
    print("*** ef4:",ef4.collect())

如果您运行此程序,您会看到字典 d 的 ID 在每个数据帧中都不同。显然,当对象从映射器传递到映射器时,Spark 正在序列化反序列化对象。所以每个都有自己的版本。

如果这不是真的,那么第一次调用 funnymap 以创建 df1 也会更改 dfroot 数据帧中的代数,因此 ef4 将具有与 df4 不同的代数。

【讨论】:

以上是关于如果 RDD 变大,Spark 将如何反应?的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何将具有多列的 RDD 转换为数据框

SPARK如何在内存中读取数据和管理

如果存储在键中的值匹配,如何在 Spark 中合并两个 RDD

如何将地图转换为 Spark 的 RDD

Spark 2.0:如何将元组的 RDD 转换为 DF [重复]

如何将 spark 数据帧转换为 RDD 并获取词袋