DruidDeterminePartitionsJob源码解析
Posted 巧克力黒
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DruidDeterminePartitionsJob源码解析相关的知识,希望对你有一定的参考价值。
功能
在hadoop_index任务导入Druid形成Segment之前,会有Job通过对原始数据的统计计算,形成Segment的数据分布的具体配置,当tuningConfig.partitionsSpec.type="single_dim" 或者"dimension"(旧版本)时,前置的对原始数据统计计算任务就走到DeterminePartitionsJob流程当中。
DeterminePartitionsJob主要流程分成三部分
1、determine_partitions_groupby任务,根据assumeGrouped配置情况,判断是否启动determine_partitions_groupby,assumeGrouped的含义是指输入的数据是否已经是Grouped状态。该groupby任务通过执行MR任务,计算原始数据按timestamp,dimension聚合后有多少数据条数。计算结果相当于导入Druid Segment的最终数据条数。
主要涉及DeterminePartitionsGroupByMapper、DeterminePartitionsGroupByReducer类
2、determine_partitions_dimselection任务,通过determine_partitions_groupby任务的结果数据,或者原始数据。选取判断partition的维度字段(dimension),并且通过该维度字段具体数据值,划分好多个数据区间。
主要涉及DeterminePartitionsDimSelectionPostGroupByMapper、DeterminePartitionsDimSelectionAssumeGroupedMapper、DeterminePartitionsDimSelectionReducer。重要逻辑更多集中在DeterminePartitionsDimSelectionReducer中,经过一些列逻辑最终决定选择哪个Dimension作为partition维度,并且通过Dimension的值划分了多个数据区间。
3、加载上一步partition信息,形成List<HadoopyShardSpec>。下游任务可以通过List<HadoopyShardSpec>生成导入Druid Segement的具体任务(IndexGeneratorJob)
逻辑处理流程图:
第一步,determine_partitions_groupby任务
Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear in the final segment.
通过determine_partitions_groupby过程的MapReduce任务,将原始数据按Interval开始时间和其所有维度组合计算出来,就是对timestamp,dimension进行GROUP BY的效果。
通常在hadoop_index任务中这一过程的MR并不是必须存在的,只有在assumeGrouped配置为false时才会产生determine_partitions_groupby任务
Mapper的逻辑:
格式化当前数据<Interval开始时间戳, 当前行inputRow>,将其作为Mapper的输出Key
protected void innerMap(
InputRow inputRow,
Context context
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
context.write(
new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)),
NullWritable.get()
);
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1);
}
}
Reducer的逻辑:
protected void reduce(
BytesWritable key,
Iterable<NullWritable> values,
Context context
) throws IOException, InterruptedException
{
context.write(key, NullWritable.get());
}
determine_partitions_groupby过程的输入输出结果:
第二步、determine_partitions_dimselection任务
Read grouped data and determine appropriate partitions过程
如果assumeGrouped配置为false的情况下,determine_partitions_dimselection的MapReduce任务将第一步骤的结果数据作为输入。以DeterminePartitionsDimSelectionPostGroupByMapper作为Mapper读入数据。
如果assumeGrouped配置为true的情况下,则任务原始数据是已经是Grouped状态,则把原始数据作为输入。以DeterminePartitionsDimSelectionAssumeGroupedMapper作为Mapper读入数据。
无论使用哪个Mapper,目的只有一个,就是计算DimValueCount,标识每个维度、维度值的数据量。其结构存储为
private DimValueCount(String dim, String value, long numRows)
{
this.dim = dim;
this.value = value;
this.numRows = numRows;
}
在Mapper最主要的逻辑在helper.emitDimValueCounts(context, DateTimes.utc(inputRow.getTimestampFromEpoch()), dims);方法中,主要emitDimValueCounts()方法代码逻辑:同一个groupKey每行记录对应存储new DimValueCount("", "", 1),用于计算总的数据条数,然后再计算groupKey对每个维度也统计记录条数。计数逻辑在代码中有注释
void emitDimValueCounts(
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
{
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {
throw new ISE("No bucket found for timestamp: %s", timestamp);
}
final Interval interval = maybeInterval.get();
final int intervalIndex = intervalIndexes.get(interval.getStartMillis());
final ByteBuffer buf = ByteBuffer.allocate(4 + 8);
buf.putInt(intervalIndex);
buf.putLong(interval.getStartMillis());
final byte[] groupKey = buf.array();
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1)); // 统计总的数据条数
for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
if (partitionDimension == null || partitionDimension.equals(dim)) {
final Iterable<String> dimValues = dimAndValues.getValue();
if (Iterables.size(dimValues) == 1) {
// Emit this value. 统计每个维度的数据条数
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
} else {
// This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
write(context, groupKey, new DimValueCount(dim, "", -1));
}
}
}
}
从DeterminePartitionsDimSelectionReducer的输入输出数据角度查看determine_partitions_dimselection过程的数据逻辑。
DeterminePartitionsDimSelectionReducer逻辑:
determine_partitions_dimselection任务的Mapper阶段结束后,DeterminePartitionsDimSelectionReducer拉取到所有的DimValueCount。
1.按规则拆分数据范围
汇总所有维度的可能情况。根据Mapper对每个Dim的统计结果,结合targetRowsPerSegment,maxRowsPerSegment参数,将所有维度和该维度的起始值汇总。
代码642行:while (iterator.hasNext()) 逻辑中会按照维度的数据量对某个dimension计算数据区间[null, end),[start,end),[start,end)……,[start,null)
首先,确定dimension的start=null, end=值
其次,根据currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()条件,确定中间范围dimension的start, end值
最后,确定dimension的start=值,end=null
形成数据结构如下
Adding possible shard with 9 rows and 1 unique values: SingleDimensionShardSpec{dimension=\'country\', start=\'US\', end=\'null\', partitionNum=1, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'null\', end=\'c.example.com\', partitionNum=0, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'c.example.com\', end=\'e.example.com\', partitionNum=1, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'e.example.com\', end=\'g.example.com\', partitionNum=2, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'g.example.com\', end=\'i.example.com\', partitionNum=3, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'i.example.com\', end=\'null\', partitionNum=4, numCorePartitions=-1}
2. Choose best dimension环节。
选取最佳维度。根据第一步计算结果的所有维度起始条件。判断选取哪个维度(dimension)作为partition条件
for (final DimPartitions dimPartitions : dimPartitionss.values()) {
if (dimPartitions.getRows() != totalRows) {
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
dimPartitions.getRows(),
totalRows
);
continue;
}
// Make sure none of these shards are oversized
boolean oversized = false;
final SingleDimensionPartitionsSpec partitionsSpec =
(SingleDimensionPartitionsSpec) config.getPartitionsSpec();
for (final DimPartition partition : dimPartitions.partitions) {
if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true;
}
}
if (oversized) {
continue;
}
final int cardinality = dimPartitions.getCardinality();
final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize());
if (cardinality > maxCardinality) {
maxCardinality = cardinality;
maxCardinalityPartitions = dimPartitions;
}
if (distance < minDistance) {
minDistance = distance;
minDistancePartitions = dimPartitions;
}
}
最终在assumeGrouped=false,targetRowsPerSegment=null,maxRowsPerSegment=2,interval=2014-10-20T00:00:00Z/P1D条件下,形成如下Reducer输出数据
[
{"type":"single","dimension":"host","start":null,"end":"c.example.com","partitionNum":0,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"c.example.com","end":"e.example.com","partitionNum":1,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"e.example.com","end":"g.example.com","partitionNum":2,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"g.example.com","end":"i.example.com","partitionNum":3,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"i.example.com","end":null,"partitionNum":4,"numCorePartitions":-1}
]
第三步、加载Partition形成List<HadoopyShardSpec>
Load partitions determined by the previous job.
如同DetermineHashedPartitionsJob类,也存在这个过程。就是将前置几个步骤的输出数据转换为List<HadoopyShardSpec>,
该List<HadoopyShardSpec>将指导IndexGeneratorJob将Hadoop数据导入Druid形成Segment
数据处理逻辑图
疑问:
1、为什么会有determine_partitions_dimselection过程,single_dim的情况下不是可以指定维度列么?
2、具体选择最好的dimension逻辑和规则是什么?
选择数据基数大
3、Choose best dimension环节,详细逻辑怎么解释
tuningConfig.partitionsSpec.type="hashed"逻辑:https://blog.51cto.com/u_10120275/3530686
以上是关于DruidDeterminePartitionsJob源码解析的主要内容,如果未能解决你的问题,请参考以下文章