殊途同归:如何用Spark来实现已有的MapReduce程序

Posted 小象

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了殊途同归:如何用Spark来实现已有的MapReduce程序相关的知识,希望对你有一定的参考价值。

MapReduce从出现以来,已经成为Apache Hadoop计算范式的扛鼎之作。它对于符合其设计的各项工作堪称完美:大规模日志处理,ETL批处理操作等。

随着Hadoop使用范围的不断扩大,人们已经清楚知道MapReduce不是所有计算的最佳框架。Hadoop 2将资源管理器YARN作为自己的顶级组件,为其他计算引擎的接入提供了可能性。如Impala等非MapReduce架构的引入,使平台具备了支持交互式SQL的能力。

今天,Apache Spark是另一种这样的替代,并且被称为是超越MapReduce的通用计算范例。也许您会好奇:MapReduce一直以来已经这么有用了,怎么能突然被取代?毕竟,还有很多ETL这样的工作需要在Hadoop上进行,即使该平台目前也已经拥有其他实时功能。

值得庆幸的是,在Spark上重新实现MapReduce一样的计算是完全可能的。它们可以被更简单的维护,而且在某些情况下更快速,这要归功于Spark优化了刷写数据到磁盘的过程。Spark重新实现MapReduce编程范式不过是回归本源。Spark模仿了Scala的函数式编程风格和API。而MapReduce的想法来自于函数式编程语言LISP。

尽管Spark的主要抽象是RDD(弹性分布式数据集),实现了Map,reduce等操作,但这些都不是Hadoop的Mapper或Reducer API的直接模拟。这些转变也往往成为开发者从Mapper和Reducer类平行迁移到Spark的绊脚石。

与Scala或Spark中经典函数语言实现的map和reduce函数相比,原有Hadoop提供的Mapper和Reducer API 更灵活也更复杂。这些区别对于习惯了MapReduce的开发者而言也许并不明显,下列行为是针对Hadoop的实现而不是MapReduce的抽象概念:
· Mapper和Reducer总是使用键值对作为输入输出。
· 每个Reducer按照Key对Value进行reduce。
· 每个Mapper和Reducer对于每组输入可能产生0个,1个或多个键值对。
· Mapper和Reducer可能产生任意的keys和values,而不局限于输入的子集和变换。
Mapper和Reducer对象的生命周期可能横跨多个map和reduce操作。它们支持setup和cleanup方法,在批量记录处理开始之前和结束之后被调用。

本文将简要展示怎样在Spark中重现以上过程,您将发现不需要逐字翻译Mapper和Reducer!


作为元组的键值对

假定我们需要计算大文本中每一行的长度,并且报告每个长度的行数。在HadoopMapReduce中,我们首先使用一个Mapper,生成为以行的长度作为key1作为value的键值对。

public class LineLengthMapper extends

Mapper<LongWritable, Text, IntWritable, IntWritable> {

@Override

protected void map(LongWritable lineNumber, Text line, Context context)

throws IOException, InterruptedException {

context.write(new IntWritable(line.getLength()), new IntWritable(1));

}

}


值得注意的是MappersReducers只对键值对进行操作。所以由TextInputFormat提供输入给LineLengthMapper,实际上也是以文本中位置为key(很少这么用,但是总是需要有东西作为Key),文本行为值的键值对。


与之对应的Spark实现:


lines.map(line => (line.length, 1))


Spark中,输入只是String构成的RDD,而不是key-value键值对。Spark中对key-value键值对的表示是一个Scala的元组,用(AB)这样的语法来创建。上面的map操作的结果是(IntInt)元组的RDD。当一个RDD包含很多元组,它获得了多个方法,如reduceByKey,这对再现MapReduce行为将是至关重要的


Reduce

reduce()reduceBykey()

统计行的长度的键值对,需要在Reducer中对每种长度作为key,计算其行数的总和作为value

public class LineLengthReducer extends

Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

@Override

protected void reduce(IntWritable length, Iterable<IntWritable> counts,

Context context) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum += count.get();

}

context.write(length, new IntWritable(sum));

}

}


Spark中与上述MapperReducer对应的实现只要一行代码:

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)


SparkRDD API有个reduce方法,但是它会将所有key-value键值对reduce为单个value。这并不是Hadoop MapReduce的行为Spark中与之对应的是ReduceByKey


另外,ReducerReduce方法接收多值流,并产生01或多个结果。而reduceByKey,它接受的是一个将两个值转化为一个值的函数,在这里,就是把两个数字映射到它们的和的简单加法函数。此关联函数可以被调用者用来reduce多个值到一个值。与Reducer方法相比,他是一个根据KeyReduce Value的更简单而更精确的API

Mapper

map() flatMap()

现在,考虑一个统计以大写字母开头的单词的个数的算法。对于每行输入文本,Mapper可能产生0个,1个或多个键值对。

public class CountUppercaseMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

@Override

protected void map(LongWritable lineNumber, Text line, Context context)

throws IOException, InterruptedException {

for (String word : line.toString().split(" ")) {

if (Character.isUpperCase(word.charAt(0))) {

context.write(new Text(word), new IntWritable(1));

}

}

}

}


Spark对应的写法:

lines.flatMap(

_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))

)

简单的Spark map函数不适用于这种场景,因为map对于每个输入只能产生单个输出,但这个例子中一行需要产生多个输出。所以,和MapperAPI支持的相比,Sparkmap函数语义更简单,应用范围更窄。


Spark的解决方案是首先将每行映射为一组输出值,这组值可能为空值或多值。随后会通过flatMap函数被扁平化。数组中的词会被过滤并被转化为函数中的元组。这个例子中,真正模仿Mapper行为的是flatMap,而不是map

groupByKey()

写一个次数的reducer简单的,在Spark中,reduceByKey可以被用来每个单词数。比如出于某种原因要求出文件中每个单词都要大写字母和其数量,在MapReduce中,如下:

public class CountUppercaseReducer extends

Reducer<Text, IntWritable, Text, IntWritable> {

@Override

protected void reduce(Text word, Iterable<IntWritable> counts, Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum += count.get();

}

context

.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));

}

}


但是redeceByKey不能单独在Spark中工作,因为他保留了原来的key。为了在Spark中模拟,我们需要一些更像Reducer API的操作。我们知道Reducerreduce方法接受一个key和一组值,然后完成一组转换。groupByKey和一个连续的map操作能够达到这样的目标:

groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey只是将某一个key的所有值收集在一起,并且不提供reduce功能。以此为基础,任何转换都可以作用在key和一系列值上。此处,将key转变为大写字母,将values直接求和。


setup()cleanup()


MapReduce中,MapperReducer可以声明一个setup方法,在处理输入之前执行,来进行分配数据库连接等昂贵资源,同时可以用cleanup函数可以释放资源。

public class SetupCleanupMapper extends

Mapper<LongWritable, Text, Text, IntWritable> {

private Connection dbConnection;

@Override

protected void setup(Context context) {

dbConnection = ...;

}

...

@Override

protected void cleanup(Context context) {

dbConnection.close();

}

}


Spark中的mapflatMap方法每次只能在一个input上操作,而且没有提供在转换大批值前后执行代码的方法,看起来,似乎可以直接将setupcleanup代码放在Sparkmap函数调用之前和之后:

val dbConnection = ...

lines.map(... dbConnection.createStatement(...) ...)

dbConnection.close() // Wrong!


然而这种方法却不可行,原因在于:

·将对象dbConnection放在map函数的闭包中,这需要他是可序列化的(比如,通过java.io.Serializable实现)。而数据库连接这种对象一般不能被序列化。

· map是一种转换,而不是操作,并且拖延执行。连接对象不能被及时关闭。

· 即便如此,它也只能关闭driver上的连接,而不是释放被序列化拷贝版本分配的资源连接。


事实上,mapflatMap都不是SparkMapper的最接近的对应函数,SparkMapper的最接近的对应函数是十分重要的mapPartitions()方法,这个方法能够不仅完成单值对单值的映射,也能完成一组值对另一组值的映射,很像一个批映射(bulkmap)方法。这意味着mapPartitions()方法能够在开始时从本地分配资源,并在批映射结束时释放资源。


添加setup方法是简单的,添加cleanup会更困难,这是由于检测转换完成仍然是困难的。例如,这样是能工作的:


lines.mapPartitions { valueIterator =>

val dbConnection = ... // OK

val transformedIterator = valueIterator.map(... dbConnection ...)

dbConnection.close() // Still wrong! May not have evaluated iterator

transformedIterator

}


一个完整的范式应该看起来类似于:

lines.mapPartitions { valueIterator =>

if (valueIterator.isEmpty) {

Iterator[...]()

} else {

val dbConnection = ...

valueIterator.map { item =>

val transformedItem = ...

if (!valueIterator.hasNext) {

dbConnection.close()

}

transformedItem

}

}

}


虽然后者代码翻译注定不如前者优雅,但它确实能够完成工作。


flatMapPartitions方法并不存在,然而,可以通过调用mapPartitions,后面跟一个flatMap(a= > a)的调用达到同样效果。


带有setupcleanupReducer对应只需仿照上述代码使用groupByKey后面跟一个mapPartition函数。


别急,等一下,还有更多

MapReduce的开发者会指出,还有更多的还没有被提及的API

· MapReduce支持一种特殊类型的Reducer,也称为Combiner,可以从Mapper中减少洗牌(shuffled)数据大小。

· 它还支持同通过Partitioner实现的自定义分区,和通过分组Comparator实现的自定义分组。

· Context对象授予Counter API的访问权限以及它的累积统计。

· Reducer在其生命周期内一直能看到已排序好的key

· MapReduce有自己的Writable序列化方案。

· MapperReducer可以一次发射多组输出。

· MapReduce有几十个调优参数。


有很多方法可以在Spark中实现这些方案,使用类似AccumulatorAPI,类似groupBy和在不同的这些方法中加入partitioner参数的方法,JavaKryo序列化,缓存和更多。由于篇幅限制,在这篇文章中就不再累赘介绍了。


需要指出的是,MapReduce的概念仍然有用。只不过现在有了一个更强大的实现,并利用函数式语言,更好地匹配其功能性。理解Spark RDD API和原来的MapperReducerAPI之间的差异,可以帮助开发者更好地理解所有这些函数的工作原理,以及理解如何利用Spark发挥其优势。


以上是关于殊途同归:如何用Spark来实现已有的MapReduce程序的主要内容,如果未能解决你的问题,请参考以下文章

如何用java实现把excel表中的数据导入到mysql数据库已有的表中

如何用FL studio和已有的midi文件做音乐

如何用已有的证书文件和私钥文件生成keystore-CSDN论坛

如何用Word发布WordPress博客

如何用Java实现条件编译

如何用mapreduce分布式实现kmeans算法