storm RollingTopWords 实时top-N计算任务窗口设计

Posted sanmutongzi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm RollingTopWords 实时top-N计算任务窗口设计相关的知识,希望对你有一定的参考价值。

转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6381037.html    

    流式计算中我们经常会遇到需要将数据根据时间窗口进行批量统计的场景,窗口性质一般由两个参数规定:1 Window length: 可以用时间或者数量来定义窗口大小;2 Sliding interval: 窗口滑动的间隔 。通过这两个参数一般把window分成滚动窗口和滑动窗口。

Sliding Window(滑动窗口)

Tuples are grouped in windows and window slides every sliding interval. A tuple can belong to more than one window.

For example a time duration based sliding window with length 10 secs and sliding interval of 5 seconds.

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time

|<------- w1 -------->|
        |------------ w2 ------->|

Tumbling Window(滚动窗口)

Tuples are grouped in a single window based on time or count. Any tuple belongs to only one of the windows.

For example a time duration based tumbling window with length 5 secs.

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
   w1         w2            w3

     

     storm直到1.0.0版本后才官方加入了IWindowedBolt接口用来实现窗口计算,在此之前storm-starter里有一个稍微复杂点的RollingTopWords滑动窗口计算top N实现的demo。topology主要组件的流程设置如下:

(1)TestWordSpout负责产生单词源数据并通过fieldsGrouping发送到下游bolt

(2)RollingCountBolt负责统计Window length范围内的所有单词计数并每Sliding interval时间发送一次汇总信息到下游。

  (3)  IntermediateRankingsBolt,这是个中间bolt,主要是为了预先计算部分word的top-N排行榜出来,减少最终节点的排序工作。

(4)TotalRankingsBolt 最终top-N排序并输出计算结果。

1     String spoutId = "wordGenerator";
2     String counterId = "counter";
3     String intermediateRankerId = "intermediateRanker";
4     String totalRankerId = "finalRanker";
5     builder.setSpout(spoutId, new TestWordSpout(), 5);
6     builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
7     builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
8         "obj"));
9     builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);

 

 RollingCountBolt初始化参数正好就是上面提到的windowLengthInSeconds和emitFrequencyInSeconds,new RollingCountBolt(300, 60)表示每分钟输出一下最近五分钟内的数据统计。

counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,this.emitFrequencyInSeconds));

RollingCountBolt内部存放了一个SlidingWindowCounter的结构,SlidingWindowCounter内部存储了SlotBasedCounter,SlotBasedCounter才是具体实现了怎样进行

窗口计算,滑动的窗口不停对应到一个环形的slot列表中。SlidingWindowCounter在窗口滑动的时候采取了如下动作:

1   public Map<T, Long> getCountsThenAdvanceWindow() {
2     Map<T, Long> counts = objCounter.getCounts();
3     objCounter.wipeZeros();
4     objCounter.wipeSlot(tailSlot);
5     advanceHead();
6     return counts;
7   }

 

1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map 

2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间 
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口 
advanceHead的实现, 如何在数组实现循环的滑动窗口

  private void advanceHead() {
    headSlot = tailSlot;
    tailSlot = slotAfter(tailSlot);
  }

 

SlotBasedCounter主要用于按照窗口对应的slot进行incrementCount,getCounts和computeTotalCount,用于数据新增统计,全窗口数据提取和对应元素窗口内全部slot数据求和。

  public void incrementCount(T obj, int slot) {
    long[] counts = objToCounts.get(obj);
    if (counts == null) {
      counts = new long[this.numSlots];
      objToCounts.put(obj, counts);
    }
    counts[slot]++;
  }


  public Map<T, Long> getCounts() {
    Map<T, Long> result = new HashMap<T, Long>();
    for (T obj : objToCounts.keySet()) {
      result.put(obj, computeTotalCount(obj));
    }
    return result;
  }

  private long computeTotalCount(T obj) {
    long[] curr = objToCounts.get(obj);
    long total = 0;
    for (long l : curr) {
      total += l;
    }
    return total;
  }

 

如上所述, RollingCountBolt在没有窗口接口的情况下通过代码结构巧妙的实现了一个滑动窗口(理论上滚动窗口也一样可以实现),感觉还是很巧妙地。

 

参考资料:

Storm starter - RollingTopWords

Implementing Real-Time Trending Topics With a Distributed Rolling Count Algorithm in Storm

 

 

 

以上是关于storm RollingTopWords 实时top-N计算任务窗口设计的主要内容,如果未能解决你的问题,请参考以下文章

Storm学习:Storm简介

Storm实时处理架构

storm视频教程_Storm实现“天猫双十一”大屏实时展示项目

Storm 第一章 核心组件及编程模型

PL2121-基于Storm构建实时热力分布项目实战

大数据学习之Storm实时计算概述及安装部署33