storm group 的介绍与使用

Posted mrrightzhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm group 的介绍与使用相关的知识,希望对你有一定的参考价值。

一.stream group分组介绍

 Stream 的分组分为随机分组、字段分组、全部分组、全局分组、无分组、直接分组,自定义分组

二.group的介绍

  1.Shuffle grouping:通过tuple获取任务到supout,然后再由spout将任务分发到Bolt上。这种分组是随机性的,没有规律可言,任务的多少可能会跟被分配机器性能有关。

  2.Fields grouping :   根据指定字段将tuple进行分组。例如,根据“user-id”字段,相同“user-id”的tuple总是分发到task上,不同“user-id”的tuple可能分发到不同的task上。

  3.All grouping  : tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

  4.Global grouping : 在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。确切的说,是分配给ID最小的那个task执行

  5.Direct grouping : global grouping 实现刚好相反的作用。这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

  6.local or shuffle grouping :  假如上游的组件是spoutbolt,下游是一个bolt时,假如通过shufflefield分组恰好上游的task到下游的task时,两个work恰好是同一个work,都在一个jvm进程里面,正常情况下,我们会起多个solt点,让上游的task发送到下游的task时分配了2jvm进程,会通过tcp/rcp的方式进行通信,但是,如果上游的task和下游的task时在同一个进程时是没必要进行通信的,所以采用了本地或随机分组方式,减少网络通信的消耗,提高storm的运算效率

  7.None grouping :  你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

  8.Custom grouping : 一般情况下,我们不会自定义grouping

三.group 的具体实现

1.随机分组(Shuffle grouping)

=========================================  Topology  ===============================================
public class ShuffleGroupingTopology 
    
    private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingTopology.class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("ShuffleGroupingSpout", new ShuffleGroupingSpout(),1);
        builder.setBolt("ShuffleGroupingBolt",new ShuffleGroupingBolt(),2).shuffleGrouping("ShuffleGroupingSpout");
        builder.setBolt("ShuffleGroupingBolt2",new ShuffleGroupingBolt2(),2).shuffleGrouping("ShuffleGroupingBolt");
        
        Config config = new Config();
        config.setNumWorkers(3);
        try 
            StormSubmitter.submitTopology("ShuffleGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology  is submitted.","ShuffleGroupingTopology");
            log.warn("==================================================");
         catch (Exception e) 
            e.printStackTrace();
         
    

=====================================   Spout  =========================================
public class ShuffleGroupingSpout extends BaseRichSpout private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private AtomicInteger ai; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.context = context; this.ai = new AtomicInteger(); log.warn("ShuffleGroupingSpout ------> open:hashcode ---->taskId:",this.hashCode(),context.getThisTaskId()); @Override public void nextTuple() int i =this.ai.getAndIncrement(); if(i<10) log.warn("ShuffleGroupingSpout ------> nextTuple:hashcode: ---->taskId: ----->value:",this.hashCode(),context.getThisTaskId(),i); collector.emit(new Values(i)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i")); @Override public void close() log.warn("ShuffleGroupingSpout ------> close:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId()); ====================================== Bolt ============================================ public class ShuffleGroupingBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("ShuffleGroupingBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("i"); collector.emit(new Values(i*10)); log.warn("ShuffleGroupingBolt ------> prepare:hashcode: ---->taskId: ---->value:",this.hashCode(),context.getThisTaskId(),i); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("result")); @Override public void cleanup() log.warn("ShuffleGroupingBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId()); ====================================== Bolt2 ============================================ public class ShuffleGroupingBolt2 extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("result"); log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode: ---->taskId: ---->result:",this.hashCode(),context.getThisTaskId(),i); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) //nothing to do @Override public void cleanup() log.warn("ShuffleGroupingBolt2 ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId());

2.字段分组(Fields grouping)

//======================================  Topology ============================================
public class FieldsShuffleTopology 
  private static final Logger log = LoggerFactory.getLogger(FieldsShuffleTopology.class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("FieldsShuffleSpout", new FieldsShuffleSpout(),1);
        builder.setBolt("FieldsShuffleUpperBolt",new FieldsShuffleUpperBolt(),2).shuffleGrouping("FieldsShuffleSpout");
        builder.setBolt("FieldsShuffleFinalBolt",new FieldsShuffleFinalBolt(),2).fieldsGrouping("FieldsShuffleUpperBolt", new Fields("upperName"));
        
        Config config = new Config();
        config.setNumWorkers(3);
        try 
            StormSubmitter.submitTopology("FieldsShuffleTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology  is submitted.","FieldsShuffleTopology");
            log.warn("==================================================");
         catch (Exception e) 
            e.printStackTrace();
         
    


======================================  Spout  ============================================

public class FieldsShuffleSpout extends BaseRichSpout 
    private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class);

    private SpoutOutputCollector collector;
    private TopologyContext context;
    private List<String> list;
    private int index;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        this.collector = collector;
        this.context = context;
        this.index = 0;
        this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
        log.warn("FieldsShuffleSpout open:hashcode taskId:", this.hashCode(), context.getThisTaskId());
    

    @Override
    public void nextTuple() 
        if (index < list.size()) 
            String s = list.get(index++);
            log.warn("FieldsShuffleSpout nextTuple:hashcode: taskId: value:", this.hashCode(),
                    context.getThisTaskId(), s);
            collector.emit(new Values(s));
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("i"));
    

    @Override
    public void close() 
        log.warn("FieldsShuffleSpout close:hashcode: taskId: ", this.hashCode(), context.getThisTaskId());
    


======================================  Bolt ==========================================
public class FieldsShuffleUpperBolt extends BaseBasicBolt

private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) 
        this.context = context;
        log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId());
        
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String s = input.getStringByField("i");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s);
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("upperName"));
    
    
    @Override
    public void cleanup() 
        log.warn("FieldsShuffleUpperBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId());
    


======================================  Bolt2 ==========================================
public class FieldsShuffleFinalBolt extends BaseBasicBolt
    
    private static final Logger log = LoggerFactory.getLogger(FieldsShuffleFinalBolt.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) 
        this.context = context;
        log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId());
        
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String s = input.getStringByField("upperName");
        log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s);
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        //nothing to do
    
    
    @Override
    public void cleanup() 
        log.warn("FieldsShuffleFinalBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId());
    

3.全部分组(All grouping

  tuple数据将会被复制到下游的所有的bolt的任务中。这种类型需要谨慎使用。

 

 

public class AllGroupTopology 
    
    private static final Logger log = LoggerFactory.getLogger(AllGroupTopology .class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("AllGrouppingSpout", new AllGrouppingSpout(),1);
        builder.setBolt("AllBolt1",new AllBolt1(),1).shuffleGrouping("AllGrouppingSpout");
        builder.setBolt("AllBolt2",new AllBolt2(),1).shuffleGrouping("AllGrouppingSpout");
         
        Config config = new Config();
        config.setNumWorkers(3);
        try 
            StormSubmitter.submitTopology("AllGroupTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology  is submitted.","AllGroupTopology");
            log.warn("==================================================");
         catch (Exception e) 
            e.printStackTrace();
         
    

======================================  AllGrouppingSpout ==========================================
public class AllGrouppingSpout extends BaseRichSpout 
    
    private static final Logger log = LoggerFactory.getLogger(AllGrouppingSpout.class);

    private SpoutOutputCollector collector;
    private TopologyContext context;
    private List<String> list;
    private int index;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        this.collector = collector;
        this.context = context;
        this.index = 0;
        this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word");
        log.warn("AllGrouppingSpout open:hashcode taskId:", this.hashCode(), context.getThisTaskId());
    

    @Override
    public void nextTuple() 
        if (index < list.size()) 
            String s = list.get(index++);
            log.warn("AllGrouppingSpout nextTuple:hashcode: taskId: value:", this.hashCode(),
                    context.getThisTaskId(), s);
            collector.emit(new Values(s));
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("name"));
    

======================================  AllBolt1  ==========================================
public class AllBolt1 extends BaseBasicBolt

    private static final Logger log = LoggerFactory.getLogger(AllBolt1.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) 
        this.context = context;
        log.warn("AllBolt1 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId());
    
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String s = input.getStringByField("name");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("AllBolt1 execute:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s);
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        //nothing to do 
    

======================================  AllBolt2  ==========================================

public class AllBolt2 extends BaseBasicBolt

    private static final Logger log = LoggerFactory.getLogger(AllBolt2.class);
    
    private  TopologyContext context;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context) 
        this.context = context;
        log.warn("AllBolt2 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId());
    
    
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String s = input.getStringByField("name");
        collector.emit(new Values(s.toUpperCase()));
        log.warn("AllBolt2 execute :hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s);
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        //nothing to do 
    

//通过查看日志得出,2bolt都收到了spout8条数据,也就是说,shuffle groupping的分组并没有起到作用,还是从spout中获取了8条数据,所以是将任务的副本全拷贝过来了

 

 

4.全局分组(Global grouping):

 

  在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。

  确切的说,是分配给ID最小的那个task执行。

 

  需求:随机将数据分发到doubleBolt上,在最后一个bolt上做整合操作

 

public class GlobalGroupingTopology 
    
    private final static Logger log = LoggerFactory.getLogger(GlobalGroupingTopology.class);
    
    public static void main(String[] args) 

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(), 1);
        builder.setBolt("NumberDoubleBolt", new NumberDoubleBolt(), 2).shuffleGrouping("NumberGenerateSpout");
        builder.setBolt("NumberPrintBolt", new NumberPrintBolt(), 2).globalGrouping("NumberDoubleBolt");

        Config config = new Config();
        config.setNumWorkers(4);
        try 
            StormSubmitter.submitTopology("GlobalGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology  is submitted.", "GlobalGroupingTopology");
            log.warn("==================================================");
         catch (Exception e) 
            e.printStackTrace();
        
    

// ==============================  spout =======================================
public class NumberGenerateSpout extends BaseRichSpout
    
    private SpoutOutputCollector collector;
    
    private AtomicInteger counter;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        this.collector = collector;
        this.counter = new AtomicInteger(0);
        
    

    @Override
    public void nextTuple() 
        while(counter.get()< 10)
            collector.emit(new Values(counter.getAndIncrement()));
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("i"));
    


// =========================== bolt ====================================
public class NumberDoubleBolt extends BaseBasicBolt @Override public void execute(Tuple input, BasicOutputCollector collector) Integer value = input.getIntegerByField("i"); collector.emit(new Values(value*2,10)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i","constant")); // ========================== bolt ==================================== public class NumberPrintBolt extends BaseBasicBolt private final static Logger logger = LoggerFactory.getLogger(NumberPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; logger.warn("============== perpare TaskID:",context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("i"); Integer constant = input.getIntegerByField("constant"); logger.warn("taskID:,instantID:,i:,constant:",context.getThisTaskId(),this,i,constant); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do

 

5.直接分组(Direct grouping

  global grouping 实现刚好相反的作用,这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

public class DirectGroupingTopology 
    
    private static final Logger log = LoggerFactory.getLogger(DirectGroupingTopology.class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(),1);
        builder.setBolt("NumberDoubleBolt",new NumberDoubleBolt(),2).directGrouping("NumberGenerateSpout");
        
        Config config = new Config();
        config.setNumWorkers(3);
        try 
            StormSubmitter.submitTopology("DirectGroupingTopology", config, builder.createTopology());
            log.warn("==================================================");
            log.warn("the topology  is submitted.","DirectGroupingTopology");
            log.warn("==================================================");
            
         catch (Exception e) 
            e.printStackTrace();
         
    


// ============================= spout ===============================
public class NumberGenerateSpout extends BaseRichSpout private SpoutOutputCollector collector; private AtomicInteger counter; private int destTaskID; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; counter = new AtomicInteger(0); List<Integer> tasks = context.getComponentTasks("NumberDoubleBolt"); destTaskID = tasks.stream().mapToInt(Integer::intValue).max().getAsInt(); @Override public void nextTuple() while(counter.get() < 10) collector.emitDirect(destTaskID,new Values(counter.getAndIncrement())); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // TODO Auto-generated method stub declarer.declare(true,new Fields("i"));
// =============================== bolt ==============================================
public class NumberDoubleBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(NumberDoubleBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn(" =================== taskId:",context.getThisTaskId()); ; @Override public void execute(Tuple input, BasicOutputCollector collector) Integer value = input.getIntegerByField("i"); log.warn("taskID:,instanceID:,value:",context.getThisTaskId(),this,value); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do

 

 

6.本地或随机分组local or shuffle grouping

 

public class LocalLogTopology 
    
    private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("LogSpout", new LogSpout(),1);
        builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
        builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).localOrShuffleGrouping("LogSpout");
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        try 
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
            
            log.warn("==================================================");
            log.warn("the topology  is submitted.","LocalLogTopology");
            log.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(120);
            
            cluster.killTopology("LocalLogToplogy");
            cluster.shutdown();
            
         catch (Exception e) 
            e.printStackTrace();
         
    

// =============================== spout ==============================
public class LogSpout extends BaseRichSpout
    
    private SpoutOutputCollector collector;
    
    private List<String> list;
    
    private int index;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        this.collector = collector;
        this.index = 0;
        this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
        
    
    
    @Override
    public void nextTuple() 
        while(index < list.size())
            collector.emit(new Values(list.get(index++)));
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("entity"));
    

//============================== bolt =====================================
public class LogParseBolt extends BaseBasicBolt

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String entity = input.getStringByField("entity");
        List<String> list = Splitter.on(",").splitToList(entity);
        
        collector.emit(new Values(list.get(0),list.get(1))); 
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("category","item"));
    



//============================= bolt ===========================================
public class LogPrintBolt extends BaseBasicBolt private static final Logger LOG = LoggerFactory.getLogger(LogPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; LOG.info("=========================================="); LOG.info("prepare taskID:",context.getThisTaskId()); LOG.info("=========================================="); @Override public void execute(Tuple input, BasicOutputCollector collector) String category = input.getStringByField("category"); String item = input.getStringByField("item"); LOG.info("=========================================="); LOG.info("execute:category:,item:,taskID:",category,item,context.getThisTaskId()); LOG.info("=========================================="); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do

 

 

7.无分组(None grouping)

 

  你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到BoltsSpouts订阅它们的同一线程去执行(如果可能)。

 

 

8.Custom grouping(自定义)

  

public class LocalLogTopology 
    
    private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class);
    
    public static void main(String[] args) 
        
        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("LogSpout", new LogSpout(),1);
        builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout");
        builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).customGrouping("LogParseBolt", new HighTaskIDGrouping());
        
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(4);
        try 
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalLogTopology", config, builder.createTopology());
            
            log.warn("==================================================");
            log.warn("the topology  is submitted.","LocalLogTopology");
            log.warn("==================================================");
            
            TimeUnit.SECONDS.sleep(120);
            
            cluster.killTopology("LocalLogToplogy");
            cluster.shutdown();
            
         catch (Exception e) 
            e.printStackTrace();
         
    

// =============================== spout ==============================
public class LogSpout extends BaseRichSpout
    
    private SpoutOutputCollector collector;
    
    private List<String> list;
    
    private int index;
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        this.collector = collector;
        this.index = 0;
        this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM",
                "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c");
        
    
    
    @Override
    public void nextTuple() 
        while(index < list.size())
            collector.emit(new Values(list.get(index++)));
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("entity"));
    

//============================== bolt =====================================
public class LogParseBolt extends BaseBasicBolt

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) 
        String entity = input.getStringByField("entity");
        List<String> list = Splitter.on(",").splitToList(entity);
        
        collector.emit(new Values(list.get(0),list.get(1))); 
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("category","item"));
    



//============================= custom grouping ===========================================

public class HighTaskIDGrouping implements CustomStreamGrouping
    
    private int taskID;
    
    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) 
        //List<Integer> targetTasks: 下游所有的tasks的集合
        ArrayList<Integer> tasks = new ArrayList<>(targetTasks);
        Collections.sort(tasks);        //从小到大排列
        this.taskID = tasks.get(tasks.size() -1);
    

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) 
         
        return Arrays.asList(taskID);
    

 

以上是关于storm group 的介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章

Storm 第三章 Storm编程案例及Stream Grouping详解

第1节 storm编程:8storm的分发策略

Storm的并行度Grouping策略以及消息可靠处理机制简介

storm 入门原理介绍

(转发)storm 入门原理介绍

第1节 storm编程:2storm的基本介绍