迭代器在spark中的应用

Posted BAT笔试面试

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了迭代器在spark中的应用相关的知识,希望对你有一定的参考价值。

什么是迭代器?

迭代器统一抽象了数据顺序访问的方式,而无需关心底层存放数据的容器是什么(如:数组、链表、树、图)



作用一:统一取数规范

迭代器定义了一个取数的规范,具体实现由各个容器实现,这样用户可以方便的统一取数逻辑。


迭代器最基本的两个方法:

next(): 获取下一个元素hasNext(): 是否还有元素可取



作用二:桥接数据流

就像生活中的三通或插线板,迭代器可以桥接两个数据流,使数据源源不断的消耗,尤其在大数据场景下非常有用,下面以spark mapPartitions为例演示迭代器的正确操作。

# encoding utf-8from pyspark import SparkConf, SparkContext

def bad_case(it): #将数据全都缓存在内存中,容易把内存撑爆 result = [] for item in it: result.append([x * item for x in range(10000)]) return result

def good_case(it): #只进行数据流桥接,数据源源不断被消耗 class It: def __iter__(self): return self
def __next__(self): try: item = next(it) return [x * item for x in range(10000)] except StopIteration: raise StopIteration
return It()

conf = SparkConf().setAppName('iterator-test').setMaster('local[4]')sc = SparkContext()rdd = sc.parallelize([x for x in range(10000)])
#bad case 内存会彪的很高, 数据量如果很多,内存直接撑爆result = rdd.mapPartitions(bad_case).flatMap(lambda x:x).filter(lambda x:x%3==0).sum()print(result)
#good case 内存始终维持在一个低水平result = rdd.mapPartitions(good_case).flatMap(lambda x:x).filter(lambda x:x%3==0).sum()print(result)
sc.stop()



推荐阅读



加小编微信(备注:大数据)

拉你入“大数据学习交流群”



以上是关于迭代器在spark中的应用的主要内容,如果未能解决你的问题,请参考以下文章

spark scala word2vec 和多层分类感知器在情感分析中的实际应用

Lambda 作为 Python 中的迭代器在第一次迭代时返回函数对象

使用片段着色器在特定位置绘制完美的水平线

使用迭代器在向量中插入下一个值以映射

c ++ unordered_map迭代器在单个对象上

类中的装饰器在Pycharm中抛出警告