最佳MapReduce算法,用于计算每个重叠间隔的数量
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了最佳MapReduce算法,用于计算每个重叠间隔的数量相关的知识,希望对你有一定的参考价值。
[a, b]
格式中有数十亿个区间,所有这些区间都将数字空间分成多个单个区域。我打算在这件作品中输出所有单件,重叠间隔的数量。
例如:有3个区间,即:[1,7],[2,3],[6,8]。它应该输出结果如下:
[-∞, 1]: 0
[1, 2]: 1
[2, 3]: 2
[3, 6]: 1
[6, 7]: 2
[7, 8]: 1
[8, +∞]: 0
如果对于单个机器(不是MapReduce中的分布式解决方案),我知道解决方案可以将区间实例分解为start_n
,end_n
,按数字排序并从左向右迭代并使用计数器计算金额当前的片断和输出。但我不确定如何将此算法拆分为分布式方式。
有什么建议?谢谢。
在mapreduce中,最简单的方法是将对中的每个数字写入reducer。排序洗牌阶段负责排序数量和减速器将负责修复。
例如对于输入对[1,7]
,Mapper输出将是:
key: NullWritable Value: 1
key: NullWritable Value: 7
key: NullWritable Value: 1_7
使用相同的模式,所有映射器的输出形式将是:
key: NullWritable Value: 1
key: NullWritable Value: 7
key: NullWritable Value: 1_7
key: NullWritable Value: 2
key: NullWritable Value: 3
key: NullWritable Value: 2_3
key: NullWritable Value: 6
key: NullWritable Value: 8
key: NullWritable Value: 6_8
sort-shuffle步骤将输出聚合为
Key: NullWritable ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]
Reducer遍历值列表(这将是一个有序列表)和
- 将对值分隔为单独的列表
[1_7, 2_3, 6_8]
。您可以在文本中检查_
的出现以找出该对。 - 重新配对空格值,如下所示。
[-infinity, 1]
[1, 2]
[2, 3]
[3, 6]
[6, 7]
[7, 8]
[8, +infinity]
- 重新配对时,只需检查上面列表的边界即可找到计数。您可以使用“_”拆分对,并通过
parse
函数转换为数字。
例如-infinity(比如一个非常大的负长-9999999)超出所有对范围,因此减速器输出将是
key:
“[ - infinity,1]”(Text
Type)value: 0 (
IntWritable`类型)
同样对于对[1,2]
,1>=1 and 2<=7
所以减速器输出
key:
“[1,2]”(Text
类型)value: 1 (
IntWritable`类型)
对于[6,7]
,6>=1 and 7<=7
和6>=6 and 7<=8
这样的减速器输出
key:
“[1,2]”(Text
类型)value: 2 (
IntWritable`类型)
等等...
注意:NullWritable
是Java hadoop API
,代表null
。您可以使用任何常数数据(比如NullWritable
类型Hadoop Text
)而不是Writable
。这里的要点是确保所有映射器输出都应该由于相同的映射器键而降落到单个reducer。
下面是一个有效的Spark代码(至少在你的例子中它给出了正确的结果:
由于2种笛卡儿产品,代码效率不高。
间隔比较的条件可能需要一些注意:)
请随意改进代码,并在此处发布您的改进答案。
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
val sc = new SparkContext(conf)
case class Interval(start : Double, end : Double)
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)
// Get unique flat boundary values e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)
// Calculate all intervals
val intervals = all_boundaries
// For each interval start get all possible interval ends
.cartesian(all_boundaries) // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
// Filter out invalid intervals (where begin is either less or equal to the end) e.g.: from previous comment (2, 1) is invalid interval
.filter(v => v._1 < v._2) // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
// Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
.reduceByKey((a, b) => Math.min(a, b)) // [(1, 2) (2, 3), ...]
// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)
// Get counts of overlapping intervals
val countsPerInterval = intervals
// for each small interval get all possible intput intervals e.g.:
.cartesian(inputRdd) // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
// Filter out intervals that do not overlap
.filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2} // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
// Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
.mapValues(_ => 1) //[((1, 2), 1), ((1, 2), 1), ...]
// Calculate a sum per interval
.reduceByKey(_ + _) // [((1, 2), 2), ...]
// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
}
}
以上是关于最佳MapReduce算法,用于计算每个重叠间隔的数量的主要内容,如果未能解决你的问题,请参考以下文章