DruidDeterminePartitionsJob源码解析

Posted 巧克力黒

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DruidDeterminePartitionsJob源码解析相关的知识,希望对你有一定的参考价值。

功能

在hadoop_index任务导入Druid形成Segment之前,会有Job通过对原始数据的统计计算,形成Segment的数据分布的具体配置,当tuningConfig.partitionsSpec.type="single_dim" 或者"dimension"(旧版本)时,前置的对原始数据统计计算任务就走到DeterminePartitionsJob流程当中。

DeterminePartitionsJob主要流程分成三部分
1、determine_partitions_groupby任务,根据assumeGrouped配置情况,判断是否启动determine_partitions_groupby,assumeGrouped的含义是指输入的数据是否已经是Grouped状态。该groupby任务通过执行MR任务,计算原始数据按timestamp,dimension聚合后有多少数据条数。计算结果相当于导入Druid Segment的最终数据条数。
主要涉及DeterminePartitionsGroupByMapper、DeterminePartitionsGroupByReducer类

2、determine_partitions_dimselection任务,通过determine_partitions_groupby任务的结果数据,或者原始数据。选取判断partition的维度字段(dimension),并且通过该维度字段具体数据值,划分好多个数据区间。
主要涉及DeterminePartitionsDimSelectionPostGroupByMapper、DeterminePartitionsDimSelectionAssumeGroupedMapper、DeterminePartitionsDimSelectionReducer。重要逻辑更多集中在DeterminePartitionsDimSelectionReducer中,经过一些列逻辑最终决定选择哪个Dimension作为partition维度,并且通过Dimension的值划分了多个数据区间。

3、加载上一步partition信息,形成List<HadoopyShardSpec>。下游任务可以通过List<HadoopyShardSpec>生成导入Druid Segement的具体任务(IndexGeneratorJob)
逻辑处理流程图:

第一步,determine_partitions_groupby任务

Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear in the final segment.
通过determine_partitions_groupby过程的MapReduce任务,将原始数据按Interval开始时间和其所有维度组合计算出来,就是对timestamp,dimension进行GROUP BY的效果。
通常在hadoop_index任务中这一过程的MR并不是必须存在的,只有在assumeGrouped配置为false时才会产生determine_partitions_groupby任务
Mapper的逻辑:
格式化当前数据<Interval开始时间戳, 当前行inputRow>,将其作为Mapper的输出Key

protected void innerMap(
      InputRow inputRow,
      Context context
  ) throws IOException, InterruptedException
  {
    final List<Object> groupKey = Rows.toGroupKey(
        rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
        inputRow
    );
    context.write(
        new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)),
        NullWritable.get()
    );

    context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1);
  }
}

Reducer的逻辑:

protected void reduce(
    BytesWritable key,
    Iterable<NullWritable> values,
    Context context
) throws IOException, InterruptedException
{
  context.write(key, NullWritable.get());
}

determine_partitions_groupby过程的输入输出结果:

第二步、determine_partitions_dimselection任务

Read grouped data and determine appropriate partitions过程
如果assumeGrouped配置为false的情况下,determine_partitions_dimselection的MapReduce任务将第一步骤的结果数据作为输入。以DeterminePartitionsDimSelectionPostGroupByMapper作为Mapper读入数据。
如果assumeGrouped配置为true的情况下,则任务原始数据是已经是Grouped状态,则把原始数据作为输入。以DeterminePartitionsDimSelectionAssumeGroupedMapper作为Mapper读入数据。
无论使用哪个Mapper,目的只有一个,就是计算DimValueCount,标识每个维度、维度值的数据量。其结构存储为

private DimValueCount(String dim, String value, long numRows)
{
  this.dim = dim;
  this.value = value;
  this.numRows = numRows;
}

在Mapper最主要的逻辑在helper.emitDimValueCounts(context, DateTimes.utc(inputRow.getTimestampFromEpoch()), dims);方法中,主要emitDimValueCounts()方法代码逻辑:同一个groupKey每行记录对应存储new DimValueCount("", "", 1),用于计算总的数据条数,然后再计算groupKey对每个维度也统计记录条数。计数逻辑在代码中有注释

void emitDimValueCounts(
    TaskInputOutputContext<?, ?, BytesWritable, Text> context,
    DateTime timestamp,
    Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
{
  final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);

  if (!maybeInterval.isPresent()) {
    throw new ISE("No bucket found for timestamp: %s", timestamp);
  }

  final Interval interval = maybeInterval.get();
  final int intervalIndex = intervalIndexes.get(interval.getStartMillis());

  final ByteBuffer buf = ByteBuffer.allocate(4 + 8);
  buf.putInt(intervalIndex);
  buf.putLong(interval.getStartMillis());
  final byte[] groupKey = buf.array();

  // Emit row-counter value.
  write(context, groupKey, new DimValueCount("", "", 1));  // 统计总的数据条数

  for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
    final String dim = dimAndValues.getKey();

    if (partitionDimension == null || partitionDimension.equals(dim)) {
      final Iterable<String> dimValues = dimAndValues.getValue();

      if (Iterables.size(dimValues) == 1) {
        // Emit this value. 统计每个维度的数据条数
        write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
      } else {
        // This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
        write(context, groupKey, new DimValueCount(dim, "", -1));
      }
    }
  }
}

从DeterminePartitionsDimSelectionReducer的输入输出数据角度查看determine_partitions_dimselection过程的数据逻辑。

DeterminePartitionsDimSelectionReducer逻辑:

determine_partitions_dimselection任务的Mapper阶段结束后,DeterminePartitionsDimSelectionReducer拉取到所有的DimValueCount。

1.按规则拆分数据范围

汇总所有维度的可能情况。根据Mapper对每个Dim的统计结果,结合targetRowsPerSegment,maxRowsPerSegment参数,将所有维度和该维度的起始值汇总。
代码642行:while (iterator.hasNext()) 逻辑中会按照维度的数据量对某个dimension计算数据区间[null, end),[start,end),[start,end)……,[start,null)

首先,确定dimension的start=null, end=值
其次,根据currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()条件,确定中间范围dimension的start, end值
最后,确定dimension的start=值,end=null

形成数据结构如下

Adding possible shard with 9 rows and 1 unique values: SingleDimensionShardSpec{dimension=\'country\', start=\'US\', end=\'null\', partitionNum=1, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'null\', end=\'c.example.com\', partitionNum=0, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'c.example.com\', end=\'e.example.com\', partitionNum=1, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'e.example.com\', end=\'g.example.com\', partitionNum=2, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'g.example.com\', end=\'i.example.com\', partitionNum=3, numCorePartitions=-1}
Adding possible shard with 2 rows and 2 unique values: SingleDimensionShardSpec{dimension=\'host\', start=\'i.example.com\', end=\'null\', partitionNum=4, numCorePartitions=-1}

2. Choose best dimension环节。

选取最佳维度。根据第一步计算结果的所有维度起始条件。判断选取哪个维度(dimension)作为partition条件

for (final DimPartitions dimPartitions : dimPartitionss.values()) {
  if (dimPartitions.getRows() != totalRows) {
    log.info(
        "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
        dimPartitions.dim,
        dimPartitions.getRows(),
        totalRows
    );

    continue;
  }

  // Make sure none of these shards are oversized
  boolean oversized = false;
  final SingleDimensionPartitionsSpec partitionsSpec =
      (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
  for (final DimPartition partition : dimPartitions.partitions) {
    if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
      log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
      oversized = true;
    }
  }

  if (oversized) {
    continue;
  }

  final int cardinality = dimPartitions.getCardinality();
  final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize());

  if (cardinality > maxCardinality) {
    maxCardinality = cardinality;
    maxCardinalityPartitions = dimPartitions;
  }

  if (distance < minDistance) {
    minDistance = distance;
    minDistancePartitions = dimPartitions;
  }
}

最终在assumeGrouped=false,targetRowsPerSegment=null,maxRowsPerSegment=2,interval=2014-10-20T00:00:00Z/P1D条件下,形成如下Reducer输出数据

[
{"type":"single","dimension":"host","start":null,"end":"c.example.com","partitionNum":0,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"c.example.com","end":"e.example.com","partitionNum":1,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"e.example.com","end":"g.example.com","partitionNum":2,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"g.example.com","end":"i.example.com","partitionNum":3,"numCorePartitions":-1},
{"type":"single","dimension":"host","start":"i.example.com","end":null,"partitionNum":4,"numCorePartitions":-1}
]

第三步、加载Partition形成List<HadoopyShardSpec>

Load partitions determined by the previous job.
如同DetermineHashedPartitionsJob类,也存在这个过程。就是将前置几个步骤的输出数据转换为List<HadoopyShardSpec>,
该List<HadoopyShardSpec>将指导IndexGeneratorJob将Hadoop数据导入Druid形成Segment

数据处理逻辑图

疑问:

1、为什么会有determine_partitions_dimselection过程,single_dim的情况下不是可以指定维度列么?
2、具体选择最好的dimension逻辑和规则是什么?
选择数据基数大
3、Choose best dimension环节,详细逻辑怎么解释

tuningConfig.partitionsSpec.type="hashed"逻辑:https://blog.51cto.com/u_10120275/3530686

以上是关于DruidDeterminePartitionsJob源码解析的主要内容,如果未能解决你的问题,请参考以下文章