迭代器在spark中的应用
Posted BAT笔试面试
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了迭代器在spark中的应用相关的知识,希望对你有一定的参考价值。
什么是迭代器?
迭代器统一抽象了数据顺序访问的方式,而无需关心底层存放数据的容器是什么(如:数组、链表、树、图)
作用一:统一取数规范
迭代器定义了一个取数的规范,具体实现由各个容器实现,这样用户可以方便的统一取数逻辑。
迭代器最基本的两个方法:
next(): 获取下一个元素
hasNext(): 是否还有元素可取
作用二:桥接数据流
就像生活中的三通或插线板,迭代器可以桥接两个数据流,使数据源源不断的消耗,尤其在大数据场景下非常有用,下面以spark mapPartitions为例演示迭代器的正确操作。
# encoding utf-8
from pyspark import SparkConf, SparkContext
def bad_case(it):
#将数据全都缓存在内存中,容易把内存撑爆
result = []
for item in it:
result.append([x * item for x in range(10000)])
return result
def good_case(it):
#只进行数据流桥接,数据源源不断被消耗
class It:
def __iter__(self):
return self
def __next__(self):
try:
item = next(it)
return [x * item for x in range(10000)]
except StopIteration:
raise StopIteration
return It()
conf = SparkConf().setAppName('iterator-test').setMaster('local[4]')
sc = SparkContext()
rdd = sc.parallelize([x for x in range(10000)])
#bad case 内存会彪的很高, 数据量如果很多,内存直接撑爆
result = rdd.mapPartitions(bad_case).flatMap(lambda x:x).filter(lambda x:x%3==0).sum()
print(result)
#good case 内存始终维持在一个低水平
result = rdd.mapPartitions(good_case).flatMap(lambda x:x).filter(lambda x:x%3==0).sum()
print(result)
sc.stop()
推荐阅读
加小编微信(备注:大数据)
拉你入“大数据学习交流群”
以上是关于迭代器在spark中的应用的主要内容,如果未能解决你的问题,请参考以下文章
spark scala word2vec 和多层分类感知器在情感分析中的实际应用