Mongo Spark Connector中的分区器

Posted softlin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mongo Spark Connector中的分区器相关的知识,希望对你有一定的参考价值。

??MongoSpark为入口类,调用MongoSpark.load,该方法返回一个MongoRDD类对象,Mongo Spark Connector框架本质上就是一个大号的自定义RDD,加了些自定义配置、适配几种分区器规则、Sql的数据封装等等,个人认为相对核心的也就是分区器的规则实现;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

当前实现的分区器(Partitioner):

??MongoPaginateByCountPartitioner 基于总数的分页分区器
??MongoPaginateBySizePartitioner 基于大小的分页分区器
??MongoSamplePartitioner 基于采样的分区器
??MongoShardedPartitioner 基于分片的分区器
??MongoSinglePartitioner 单分区分区器
??MongoSplitVectorPartitioner 基于分割向量的分区器

??这里根据源码简单介绍MongoSinglePartitioner与MongoSamplePartitioner分区器,这或许就是用得最多的两种分区器,他的默认分区器(DefaultMongoPartitioner)就是MongoSamplePartitioner分区器;
该分区默认的PartitionKey为_id、默认PartitionSizeMB为64MB、默认每个分区采样为10;

MongoSamplePartitioner

??该类的核心也是唯一的方法为:partitions方法,下面为该方法的执行流程与核心逻辑;
??1、检查执行buildInfo指令检查Mongo版本用于判断是否支持随机采样聚合运算,版本大于3.2。 hasSampleAggregateOperator方法。Mongo3.2版本中才新增了数据采样功能。
??Mongodb中的语法为:

db.cName.aggregate([
  {$sample:{ size: 10 } }
])

??上示例N等于10,如果N大于collection中总数据的5%,那么$sample将会执行collection扫描、sort,然后选择top N条文档;如果N小于5%,对于wiredTiger而言则会遍历collection并使用“伪随机”的方式选取N条文档,对于MMAPv1引擎则在_id索引上随机选取N条文档。
??2、执行collStats,用于获取集合的存储信息,如行数、大小、存储大小等等信息;
??matchQuery: 过滤条件
??partitionerOptions: ReadConfig传进去的分析器选项
??partitionKey: 分区key,默认为_id
??partitionSizeInBytes: 分区大小,默认64MB
??samplesPerPartition: 每个分区默认采样数量,默认10
??count: 集合总条数
??avgObjSizeInBytes: 对象平均字节数
??numDocumentsPerPartition: 每个分区文档数, ??partitionSizeInBytes / avgObjSizeInBytes:分区大小/对象平均大小
??numberOfSamples: 采样数量,samplesPerPartition * count / numDocumentsPerPartition,每个分区采样数*集合总数/每个分区文档数

技术图片

??如每个分区文档数大于集合总文档数,则将直接创建单分区,不采取采样数据方式创建分区,因为此时数据量太少单个分区已经可以容得下无需多个分区;

一、创建单分区

??在MongoSinglePartitioner类中通过PartitionerHelper.createPartitions执行相关逻辑;
??_id作为partitionKey,

二、通过采样数据创建分区

技术图片

??指定采样条件、采样数据量、PartitionKey、排序条件等,获取采样数据;
集合拆分:

def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1

右侧边界:

val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}

??获取右侧边界,使用采样数据数组索引对每个分区采样数求余等于0对采样数据进行过滤取右侧边界(如匹配条件不为空则再取最后一条数据);
??如采样得到62条数据,并且没有存在匹配条件,根据上述的采样数据过滤条件最后取得7条数据,分别为数据数组索引为0、索引为10、20、30、40、50、60的7条数据,数据的值为PartitionKey默认就是集合中_id字段的值;

技术图片

创建分区(Partitions)

技术图片

??获取得到PartitionKey、rightHandBoundaries后就可以调用PartitionerHelper.createPartitions创建Partition;下面为创建Partition的具体逻辑;
??使用PartitionKey创建查询边界,每个分区具有不同的查询边界,有最大、最小边界; 此处创建分区Partition并在每个分区中指定了查询边界;
??上面获取得到了7条数据,此处将创建8个分区;下面给出了简单数据用于说明该分区边界条件的基本逻辑与实现;

??1、创建Min、1、3、5、7、9、11、13、Max的序列
??2、创建1、3、5、7、9、11、13、Max序列
??3、使用zip将两个序列拉链式的合并:合并后的数据为:
??4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max

??Partition的边界条件将使用上面的边界条件,8条数据八个Partition一个对应;
??0 Partition的边界条件为:小于1
??1 Partition的边界条件为:大于等于1 小于 3
??2 Partition的边界条件为:大于等于3 小于 5
??3 Partition的边界条件为:大于等于5 小于 7
??4 Partition的边界条件为:大于等于7小于 9
??5 Partition的边界条件为:大于等于9 小于 11
??6 Partition的边界条件为:大于等于11 小于 13
??7 Partition的边界条件为:大于等于13
??上面的8个Partition为8个MongoPartition对象,每个对象的index、查询边界与上面所说的一一对应;
??在MongoRDD类的compute方法中可以看到根据对应的分区与上面创建分区时所建立的边界条件用于计算(从Mongo中获取对应数据);

技术图片

MongoSinglePartitioner

??创建单分区分区器时,直接调用PartitionerHelper.createPartitions方法创建分区,该类并无其他逻辑,并且固定的PartitionKey为_id,右侧边界条件为空集合,然后创建id为0的MongoPartition对象,并无查询边界;





































以上是关于Mongo Spark Connector中的分区器的主要内容,如果未能解决你的问题,请参考以下文章

mongo spark 大型集合的推断模式

我可以使用副本集名称通过 mongo-connector 进行连接吗

Spark在MongoDB上的读写操作

Apache Zeppelin 中的 org/bson/conversions/Bson 错误

遇到的问题---spark---spark OutOfMemoryError: Java heap space

遇到的问题---spark---spark OutOfMemoryError: Java heap space