Spark变量/文件共享方式

Posted BAT笔试面试

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark变量/文件共享方式相关的知识,希望对你有一定的参考价值。

分布式计算场景下不可避免的会遇到变量/文件在多个计算节点之间的共享。


共享的作用:


(1)减少网络传输:通过共享,同一个节点只需拷贝传输一份数据

(2)减少内存消耗:通过共享,同一个节点上的多个task使用同一份数据,无需拷贝多个副本


spark中包括以下几种共享方式:


1、普通变量共享


以下代码中,“share_variable” 变量会拷贝到每个计算节点上的每个task中,即:当一个Executor中同时运行了多个task时,这个变量会存在多个副本,task之间不进行共享,当数据量较大时,内存消耗较大。

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName('share-test').setMaster('local[4]')sc = SparkContext()
# driver中执行的代码share_variable = 0.5
# executor中执行的代码def handle(x): # 通过普通共享方式,获取driver中的变量 return x * share_variable
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()


2、广播变量(Broadcast)

广播变量在每个节点 只拷贝一份,在Executor启动task之前进行反序列化,同一个Executor的所有task共享同一个广播对象。广播变量在Executor端是 只读的, 适用于对大数据集进行广播。

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName('broadcast-test').setMaster('local[4]')sc = SparkContext()
# driver中执行的代码:定义广播变量bc_variable = sc.broadcast({'item': 'food', 'price': 5})

# executor中执行的代码def handle(x): # 获取广播变量的值 share_variable = bc_variable.value # 通过普通共享方式,获取driver中的变量 return x * share_variable['price']

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()


3、累加器(Accumulator)


累加器是可以在多个task之间进行add操作的一种共享变量,在Executor中只允许修改,不能读取,通常用于指标的统计,可以在Spark的UI中显示。

Spark变量/文件共享方式

(图片来自https://www.jianshu.com/p/9a32123af0a6)

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName('broadcast-test').setMaster('local[4]')sc = SparkContext()
# driver中执行的代码:定义累加器acc=sc.accumulator(0)

# executor中执行的代码def handle(x): # 累加器进行add操作 acc.add(1) return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()


4、文件共享


Spark文件共享方式与广播变量共享类似,每个节点只会拷贝一份数据,供该节点上所有的Executor的所有任务共同使用。
# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName('broadcast-test').setMaster('local[4]')sc = SparkContext()
# driver中执行的代码:共享文件sc.addFile("hdfs://path/to/file.txt" )

# executor中执行的代码def handle(x):
# 读取文件 share_file = SparkFiles.get('file.txt') with open(share_file,'r') as f: print(f.read())
return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()


5、从hdfs拉取文件

这种方式不推荐用于文件共享,因为每个task需要主动的从分布式文件系统拉取文件,即每个task存在一个副本,网络带宽消耗和内存消耗都较大。但是它有自己的应用场景,当数据按任务切分成不同文件存放在hdfs上时,每个task只需要拉取属于自己的文件。
# encoding utf-8import subprocess
from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName('broadcast-test').setMaster('local[4]')sc = SparkContext()
# executor中执行的代码def handle(x): # 从hdfs拉取数据 subprocess.call("hdfs dfs -get /path/to/file.txt ", shell=True) with open('/path/to/file.txt', 'r') as f: print(f.read())
return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()

推荐阅读



加小编微信(备注:大数据)

拉你入“大数据学习交流群”


以上是关于Spark变量/文件共享方式的主要内容,如果未能解决你的问题,请参考以下文章

Spark 共享变量底层实现

sparksql缓存表能做广播变量吗

2. Spark原理-RDD及共享变量

每日一题简述Spark中共享变量的基本原理与用途?

Spark闭包与序列化

08共享变量(Broadcast Variable和Accumulator)