Spark 广播变量,只有一个任务使用它

Posted

技术标签:

【中文标题】Spark 广播变量,只有一个任务使用它【英文标题】:Spark Broadcast Variable with only one task using it 【发布时间】:2021-08-22 14:59:09 【问题描述】:

据我了解,Spark 的工作方式如下:

对于标准变量,驱动程序将它们与 lambda(或更好的闭包)一起发送给使用它们的每个任务的执行程序。 对于广播变量,驱动程序仅将它们发送给执行程序一次,即第一次使用它们。

当我们知道广播变量只使用一次时,使用广播变量而不是标准变量有什么好处,所以即使在标准变量的情况下也只有一次传输?

示例(Java):

public class SparkDriver 
    public static void main(String[] args) 
        String inputPath = args[0];
        String outputPath = args[1];

        Map<String,String> dictionary = new HashMap<>();
        dictionary.put("J", "Java");
        dictionary.put("S", "Spark");

        SparkConf conf = new SparkConf()
                .setAppName("Try BV")
                .setMaster("local");

        try (JavaSparkContext context = new JavaSparkContext(conf)) 
            final Broadcast<Map<String,String>> dictionaryBroadcast = context.broadcast(dictionary);

            context.textFile(inputPath)
                   .map(line ->                 // just one transformation using BV
                       Map<String,String> d = dictionaryBroadcast.value();
                       String[] words = line.split(" ");
                       StringBuffer sb = new StringBuffer();
                       for (String w : words)
                           sb.append(d.get(w)).append(" ");
                       return sb.toString();
                   )
                   .saveAsTextFile(outputPath);  // just one action!
        
    

【问题讨论】:

【参考方案1】:

使用广播变量有几个优点,即使您只使用一次:

    您避免了一些序列化问题。当您序列化使用属于外部类的字段的匿名内部类时,这涉及序列化其封闭类。即使 spark 和其他框架编写了解决方法来部分缓解这个问题,尽管有时ClosureCleaner 并不能解决问题。您可以通过执行一些技巧来避免NotSerializableExceptions,例如:将类成员变量复制到闭包中,将匿名内部类转换为类并仅将所需字段放入构造函数等。 如果您使用BroadcastVariable,您甚至不会考虑这一点,该方法本身将仅序列化所需的变量。我建议阅读not-serializable-exception 问题和第一个答案以更好地加深概念。

    大多数时候,闭包的序列化性能比专门的序列化方法差。正如 spark 的官方文档所说:data-serialization

    Kryo 比 Java 序列化更快、更紧凑(通常高达 10 倍)。

    从official spark repo 搜索Spark 类我看到闭包是通过变量SparkEnv.get.closureSerializer 序列化的。该变量的唯一赋值是出现在SparkEnv 类的第306 行的变量,它使用标准且低效的JavaSerializer。 在这种情况下,如果你序列化一个大对象,你会因为网络带宽而损失一些性能。这也可以解释为什么 the official doc 声称要切换到 BroadcastVariable 来处理大于 20 KiB 的任务。

    每台机器只有一个副本,如果同一台物理机器上有多个执行器有优势。

     > Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
    

    分布算法可能效率更高。使用BroadcastVariable 的不变性属性不难想到按照 p2p 算法而不是集中式算法进行分发。例如,想象一下,每当您完成第一个执行程序将BroadcastVariable 发送到第二个执行程序时,从驱动程序开始,但同时初始执行程序将数据发送到第三个执行程序,依此类推。图片由bitTorrent Wikipedia page提供:

我没有从 spark 深化实现,但是,正如 Broadcast 变量的文档所说:

Spark 还尝试使用高效的广播算法分发广播变量以降低通信成本。

使用BroadcastVariable 的不变性属性,可以设计出比普通的集中式算法更有效的算法。

长话短说:使用闭包或广播变量不是一回事。如果您要发送的对象很大,请使用广播变量。

【讨论】:

在 spark 的上下文中居于首位 一如既往地取决于上下文。有时甚至无法测量差异。但是为了精确可以证明差异,也有助于理解公共 api 背后的代码【参考方案2】:

请参考这篇优秀的文章:https://www.mikulskibartosz.name/broadcast-variables-and-broadcast-joins-in-apache-spark/我可以重写它,但它很好地达到了目的并回答了你的问题。

总结:

广播变量是 Apache Spark 的一项功能,可让我们发送 一个变量的只读副本到 Spark 中的每个工作节点 集群。

广播变量只有在我们想要的时候才有用:

    在 Spark 作业的多个阶段重复使用相同的变量 通过向所有工作节点(而非所有执行程序)广播的小表加速连接。

【讨论】:

以上是关于Spark 广播变量,只有一个任务使用它的主要内容,如果未能解决你的问题,请参考以下文章

Spark(20)——广播变量和累加器

Spark广播变量实现原理及基础编程

Spark入门3(累加器和广播变量)

Spark共享变量

Spark学习之路 Spark的广播变量和累加器

Spark学习之路 Spark的广播变量和累加器[转]