sparksql缓存表能做广播变量吗

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sparksql缓存表能做广播变量吗相关的知识,希望对你有一定的参考价值。

共享变量
通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。

广播变量
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。

累加器
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者"+="方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
下面的代码展示了如何把一个数组中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:

object VectorAccumulatorParam extends AccumulatorParam[Vector]
def zero(initialValue: Vector): Vector =
Vector.zeros(initialValue.size)

def addInPlace(v1: Vector, v2: Vector): Vector =
v1 += v2



// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。

累加器仅仅在动作操作内部被更新,Spark保证每个任务在累加器上的更新操作只被执行一次,也就是说,重启任务也不会更新。在转换操作中,用户必须意识到每个任务对累加器的更新操作可能被不只一次执行,如果重新执行了任务和作业的阶段。
累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的操作更新,它们的值只有当RDD因为动作操作被计算时才被更新。因此,当执行一个惰性的转换操作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:

val accum = sc.accumulator(0)
data.map x => accum += x; f(x)
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.
参考技术A 自己操作下,不就知道了

我们可以在 github 操作中缓存纱线全局变量吗

【中文标题】我们可以在 github 操作中缓存纱线全局变量吗【英文标题】:Can we cache yarn globals in github actions 【发布时间】:2020-09-10 19:34:17 【问题描述】:

我有一些全局包,例如无服务器框架、ESLint 等。我已经为 yarn 实现了 GitHub Actions 缓存。下面是我的代码。

- uses: actions/cache@v1
  id: yarn-cache # use this to check for `cache-hit` (`steps.yarn-cache.outputs.cache-hit != 'true'`)
  with:
    path: $ steps.yarn-cache-dir-path.outputs.dir 
    key: $ runner.os -yarn-$ hashFiles('**/yarn.lock') 
    restore-keys: |
      $ runner.os -yarn-

- name: Adding serverless globally
  run: yarn global add serverless

- name: Yarn Install
  if: steps.yarn-cache.outputs.cache-hit != 'true'              
  run: |
    echo "cache hit failed"
    yarn install
  env:
    CI: false

但是我的全局包没有被缓存。有没有办法缓存 Yarn 全局变量?

【问题讨论】:

为什么在“Yarn Install”步骤中将CI 环境变量设置为false @Edric 这是一个错误。我刚刚复制了我的 yarn 构建块并为 Yarn Install 重命名。 您是否关注tutorial 启用了纱线包的缓存? @smac89 是的,我做到了。纱线缓存工作正常。问题在于纱线全局缓存。并且在将一些纱线包安装为全局之后,github操作也无法调用它。所以我目前正在使用 NPM 进行全局安装。但是缓存仍然不起作用。 【参考方案1】:

我正在粘贴解决方案的构建文件,

name: global-test
on:
    push:
        branches:
            - dev
    pull_request:
        branches:
            - dev
jobs:
    aws-deployment:
        runs-on: ubuntu-latest
        steps:
            - name: CHECKOUT ACTION
              uses: actions/checkout@v2

            - name: NODE SETUP ACTION
              uses: actions/setup-node@v1
              with:
                  node-version: '12.x'

            - name: Get yarn cache directory path
              id: yarn-cache-dir-path
              run: |
                  echo "::set-output name=dir::$(yarn cache dir)"

            - name: Set yarn global bin path
              run: |
                  yarn config set prefix $(yarn cache dir)

            - name: Add yarn bin path to system path
              run: |
                  echo $(yarn global bin) >> $GITHUB_PATH

            - name: Set yarn global installation path
              run: |
                  yarn config set global-folder $(yarn cache dir)

            - name: CACHE ACTION
              uses: actions/cache@v2
              env:
                  cache-version: v1
              id: yarn-cache
              with:
                  path: |
                      $ steps.yarn-cache-dir-path.outputs.dir 
                      **/node_modules
                  key: $ runner.os -yarn-$ env.cache-version -$ hashFiles('**/yarn.lock') 
                  restore-keys: |
                      $ runner.os -yarn-$ env.cache-version -
                      $ runner.os -yarn-
                      $ runner.os -

            - name: Installing dependencies
              if: steps.yarn-cache.outputs.cache-hit != 'true'
              run: |
                  echo "YARN CACHE CHANGED"
                  yarn install

            - name: Adding serverless globally
              if: steps.yarn-cache.outputs.cache-hit != 'true'
              run: |
                  echo "NO CACHE HIT"
                  yarn global add serverless

我为这些步骤命名,以便他们可以理解。

2020-12-06 更新了答案

【讨论】:

这会抛出:The 'add-path' command is disabled. Please upgrade to using Environment Files or opt into unsecure command execution by setting the 'ACTIONS_ALLOW_UNSECURE_COMMANDS' environment variable to 'true'. For more information see: github.blog/changelog/2020-10-01-github-actions-deprecating-set-env-and-add-path-commands

以上是关于sparksql缓存表能做广播变量吗的主要内容,如果未能解决你的问题,请参考以下文章

定制Spark SQL: 一种轻量级实现方案

Flink的累加器和广播变量广播流分布式缓存

Spark SQL - 在 Spark Streams 上部署 SQL 查询的选项

在 pyspark 地图逻辑中使用 sparksql 不起作用

oracle 的存储过程中 动态的创建一张表 然后插入一个变量到这个表中,表能动态的创建但是变量不能插入进去

在 map 函数中调用 SPARK SQL