Spark 累加器实验

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 累加器实验相关的知识,希望对你有一定的参考价值。

以下代码用 Pyspark + IPython 完成


统计日志空行的数量:

读取日志,创建RDD:

myrdd = sc.textFile("access.log")


  1. 不使用累加器:

In [68]: s = 0
In [69]: def f(x):
    ...:     global s
    ...:     if len(x) == 0:
    ...:         s += 1
    ...:
In [70]: myrdd.foreach(f)
In [71]: print (s)

得出结果为:

0

原因是python 的变量,即使是全局变量不能应用在各个计算进程(线程)中同步数据,所以需要分布式计算框架的变量来同步数据,Spark 中采用累加器来解决:


  1. 使用累加器

In [64]: s = sc.accumulator(0)
In [65]: def f(x):
    ...:     global s
    ...:     if len(x) == 0:
    ...:         s += 1
    ...:
In [66]: myrdd.foreach(f)
In [67]: print (s)

得出正确结果:

14



本文出自 “恩墨大数据学院-孟硕” 博客,请务必保留此出处http://enmobda.blog.51cto.com/11983635/1940586

以上是关于Spark 累加器实验的主要内容,如果未能解决你的问题,请参考以下文章

spark源码跟踪累加器Accumulators

Spark 累加器与广播变量

Spark 系列—— 累加器与广播变量

Spark 系列—— 累加器与广播变量

Spark 累加器使用

入门大数据---Spark累加器与广播变量