Spark 广播变量,只有一个任务使用它
Posted
技术标签:
【中文标题】Spark 广播变量,只有一个任务使用它【英文标题】:Spark Broadcast Variable with only one task using it 【发布时间】:2021-08-22 14:59:09 【问题描述】:据我了解,Spark 的工作方式如下:
对于标准变量,驱动程序将它们与 lambda(或更好的闭包)一起发送给使用它们的每个任务的执行程序。 对于广播变量,驱动程序仅将它们发送给执行程序一次,即第一次使用它们。当我们知道广播变量只使用一次时,使用广播变量而不是标准变量有什么好处,所以即使在标准变量的情况下也只有一次传输?
示例(Java):
public class SparkDriver
public static void main(String[] args)
String inputPath = args[0];
String outputPath = args[1];
Map<String,String> dictionary = new HashMap<>();
dictionary.put("J", "Java");
dictionary.put("S", "Spark");
SparkConf conf = new SparkConf()
.setAppName("Try BV")
.setMaster("local");
try (JavaSparkContext context = new JavaSparkContext(conf))
final Broadcast<Map<String,String>> dictionaryBroadcast = context.broadcast(dictionary);
context.textFile(inputPath)
.map(line -> // just one transformation using BV
Map<String,String> d = dictionaryBroadcast.value();
String[] words = line.split(" ");
StringBuffer sb = new StringBuffer();
for (String w : words)
sb.append(d.get(w)).append(" ");
return sb.toString();
)
.saveAsTextFile(outputPath); // just one action!
【问题讨论】:
【参考方案1】:使用广播变量有几个优点,即使您只使用一次:
您避免了一些序列化问题。当您序列化使用属于外部类的字段的匿名内部类时,这涉及序列化其封闭类。即使 spark 和其他框架编写了解决方法来部分缓解这个问题,尽管有时ClosureCleaner 并不能解决问题。您可以通过执行一些技巧来避免NotSerializableExceptions
,例如:将类成员变量复制到闭包中,将匿名内部类转换为类并仅将所需字段放入构造函数等。
如果您使用BroadcastVariable
,您甚至不会考虑这一点,该方法本身将仅序列化所需的变量。我建议阅读not-serializable-exception 问题和第一个答案以更好地加深概念。
大多数时候,闭包的序列化性能比专门的序列化方法差。正如 spark 的官方文档所说:data-serialization
Kryo 比 Java 序列化更快、更紧凑(通常高达 10 倍)。
从official spark repo 搜索Spark 类我看到闭包是通过变量SparkEnv.get.closureSerializer
序列化的。该变量的唯一赋值是出现在SparkEnv 类的第306 行的变量,它使用标准且低效的JavaSerializer
。
在这种情况下,如果你序列化一个大对象,你会因为网络带宽而损失一些性能。这也可以解释为什么 the official doc 声称要切换到 BroadcastVariable
来处理大于 20 KiB 的任务。
每台机器只有一个副本,如果同一台物理机器上有多个执行器有优势。
> Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
分布算法可能效率更高。使用BroadcastVariable
的不变性属性不难想到按照 p2p 算法而不是集中式算法进行分发。例如,想象一下,每当您完成第一个执行程序将BroadcastVariable
发送到第二个执行程序时,从驱动程序开始,但同时初始执行程序将数据发送到第三个执行程序,依此类推。图片由bitTorrent Wikipedia page提供:
我没有从 spark 深化实现,但是,正如 Broadcast 变量的文档所说:
Spark 还尝试使用高效的广播算法分发广播变量以降低通信成本。
使用BroadcastVariable
的不变性属性,可以设计出比普通的集中式算法更有效的算法。
长话短说:使用闭包或广播变量不是一回事。如果您要发送的对象很大,请使用广播变量。
【讨论】:
在 spark 的上下文中居于首位 一如既往地取决于上下文。有时甚至无法测量差异。但是为了精确可以证明差异,也有助于理解公共 api 背后的代码【参考方案2】:请参考这篇优秀的文章:https://www.mikulskibartosz.name/broadcast-variables-and-broadcast-joins-in-apache-spark/我可以重写它,但它很好地达到了目的并回答了你的问题。
总结:
广播变量是 Apache Spark 的一项功能,可让我们发送 一个变量的只读副本到 Spark 中的每个工作节点 集群。
广播变量只有在我们想要的时候才有用:
在 Spark 作业的多个阶段重复使用相同的变量 通过向所有工作节点(而非所有执行程序)广播的小表加速连接。
【讨论】:
以上是关于Spark 广播变量,只有一个任务使用它的主要内容,如果未能解决你的问题,请参考以下文章