storm group 的介绍与使用
Posted mrrightzhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm group 的介绍与使用相关的知识,希望对你有一定的参考价值。
一.stream group分组介绍
Stream 的分组分为随机分组、字段分组、全部分组、全局分组、无分组、直接分组,自定义分组
二.group的介绍
1.Shuffle grouping:通过tuple获取任务到supout,然后再由spout将任务分发到Bolt上。这种分组是随机性的,没有规律可言,任务的多少可能会跟被分配机器性能有关。
2.Fields grouping : 根据指定字段将tuple进行分组。例如,根据“user-id”字段,相同“user-id”的tuple总是分发到task上,不同“user-id”的tuple可能分发到不同的task上。
3.All grouping : tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4.Global grouping : 在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。确切的说,是分配给ID最小的那个task执行。
5.Direct grouping : 与global grouping 实现刚好相反的作用。这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
6.local or shuffle grouping : 假如上游的组件是spout或bolt,下游是一个bolt时,假如通过shuffle或field分组恰好上游的task到下游的task时,两个work恰好是同一个work,都在一个jvm进程里面,正常情况下,我们会起多个solt点,让上游的task发送到下游的task时分配了2个jvm进程,会通过tcp/rcp的方式进行通信,但是,如果上游的task和下游的task时在同一个进程时是没必要进行通信的,所以采用了本地或随机分组方式,减少网络通信的消耗,提高storm的运算效率
7.None grouping : 你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
8.Custom grouping : 一般情况下,我们不会自定义grouping,
三.group 的具体实现
1.随机分组(Shuffle grouping)
========================================= Topology =============================================== public class ShuffleGroupingTopology private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ShuffleGroupingSpout", new ShuffleGroupingSpout(),1); builder.setBolt("ShuffleGroupingBolt",new ShuffleGroupingBolt(),2).shuffleGrouping("ShuffleGroupingSpout"); builder.setBolt("ShuffleGroupingBolt2",new ShuffleGroupingBolt2(),2).shuffleGrouping("ShuffleGroupingBolt"); Config config = new Config(); config.setNumWorkers(3); try StormSubmitter.submitTopology("ShuffleGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","ShuffleGroupingTopology"); log.warn("=================================================="); catch (Exception e) e.printStackTrace(); ===================================== Spout =========================================
public class ShuffleGroupingSpout extends BaseRichSpout private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private AtomicInteger ai; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.context = context; this.ai = new AtomicInteger(); log.warn("ShuffleGroupingSpout ------> open:hashcode ---->taskId:",this.hashCode(),context.getThisTaskId()); @Override public void nextTuple() int i =this.ai.getAndIncrement(); if(i<10) log.warn("ShuffleGroupingSpout ------> nextTuple:hashcode: ---->taskId: ----->value:",this.hashCode(),context.getThisTaskId(),i); collector.emit(new Values(i)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i")); @Override public void close() log.warn("ShuffleGroupingSpout ------> close:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId()); ====================================== Bolt ============================================ public class ShuffleGroupingBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("ShuffleGroupingBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("i"); collector.emit(new Values(i*10)); log.warn("ShuffleGroupingBolt ------> prepare:hashcode: ---->taskId: ---->value:",this.hashCode(),context.getThisTaskId(),i); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("result")); @Override public void cleanup() log.warn("ShuffleGroupingBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId()); ====================================== Bolt2 ============================================ public class ShuffleGroupingBolt2 extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("result"); log.warn("ShuffleGroupingBolt2 ------> prepare:hashcode: ---->taskId: ---->result:",this.hashCode(),context.getThisTaskId(),i); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) //nothing to do @Override public void cleanup() log.warn("ShuffleGroupingBolt2 ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId());
2.字段分组(Fields grouping)
//====================================== Topology ============================================ public class FieldsShuffleTopology private static final Logger log = LoggerFactory.getLogger(FieldsShuffleTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("FieldsShuffleSpout", new FieldsShuffleSpout(),1); builder.setBolt("FieldsShuffleUpperBolt",new FieldsShuffleUpperBolt(),2).shuffleGrouping("FieldsShuffleSpout"); builder.setBolt("FieldsShuffleFinalBolt",new FieldsShuffleFinalBolt(),2).fieldsGrouping("FieldsShuffleUpperBolt", new Fields("upperName")); Config config = new Config(); config.setNumWorkers(3); try StormSubmitter.submitTopology("FieldsShuffleTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","FieldsShuffleTopology"); log.warn("=================================================="); catch (Exception e) e.printStackTrace(); ====================================== Spout ============================================ public class FieldsShuffleSpout extends BaseRichSpout private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.context = context; this.index = 0; this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word"); log.warn("FieldsShuffleSpout open:hashcode taskId:", this.hashCode(), context.getThisTaskId()); @Override public void nextTuple() if (index < list.size()) String s = list.get(index++); log.warn("FieldsShuffleSpout nextTuple:hashcode: taskId: value:", this.hashCode(), context.getThisTaskId(), s); collector.emit(new Values(s)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i")); @Override public void close() log.warn("FieldsShuffleSpout close:hashcode: taskId: ", this.hashCode(), context.getThisTaskId()); ====================================== Bolt ========================================== public class FieldsShuffleUpperBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(ShuffleGroupingBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) String s = input.getStringByField("i"); collector.emit(new Values(s.toUpperCase())); log.warn("FieldsShuffleUpperBolt ------> prepare:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("upperName")); @Override public void cleanup() log.warn("FieldsShuffleUpperBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId()); ====================================== Bolt2 ========================================== public class FieldsShuffleFinalBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(FieldsShuffleFinalBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) String s = input.getStringByField("upperName"); log.warn("FieldsShuffleFinalBolt ------> prepare:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) //nothing to do @Override public void cleanup() log.warn("FieldsShuffleFinalBolt ------> cleanup:hashcode: ---->taskId: ",this.hashCode(),context.getThisTaskId());
3.全部分组(All grouping)
tuple数据将会被复制到下游的所有的bolt的任务中。这种类型需要谨慎使用。
public class AllGroupTopology private static final Logger log = LoggerFactory.getLogger(AllGroupTopology .class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("AllGrouppingSpout", new AllGrouppingSpout(),1); builder.setBolt("AllBolt1",new AllBolt1(),1).shuffleGrouping("AllGrouppingSpout"); builder.setBolt("AllBolt2",new AllBolt2(),1).shuffleGrouping("AllGrouppingSpout"); Config config = new Config(); config.setNumWorkers(3); try StormSubmitter.submitTopology("AllGroupTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","AllGroupTopology"); log.warn("=================================================="); catch (Exception e) e.printStackTrace(); ====================================== AllGrouppingSpout ========================================== public class AllGrouppingSpout extends BaseRichSpout private static final Logger log = LoggerFactory.getLogger(AllGrouppingSpout.class); private SpoutOutputCollector collector; private TopologyContext context; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.context = context; this.index = 0; this.list = Arrays.asList("Hello", "Hello", "Hello", "Hello", "Hello", "Hello", "Word", "Word"); log.warn("AllGrouppingSpout open:hashcode taskId:", this.hashCode(), context.getThisTaskId()); @Override public void nextTuple() if (index < list.size()) String s = list.get(index++); log.warn("AllGrouppingSpout nextTuple:hashcode: taskId: value:", this.hashCode(), context.getThisTaskId(), s); collector.emit(new Values(s)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("name")); ====================================== AllBolt1 ========================================== public class AllBolt1 extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(AllBolt1.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("AllBolt1 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) String s = input.getStringByField("name"); collector.emit(new Values(s.toUpperCase())); log.warn("AllBolt1 execute:hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) //nothing to do ====================================== AllBolt2 ========================================== public class AllBolt2 extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(AllBolt2.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn("AllBolt2 ------> prepare:hashcode: ----> taskId: ",this.hashCode(),context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) String s = input.getStringByField("name"); collector.emit(new Values(s.toUpperCase())); log.warn("AllBolt2 execute :hashcode: ---->taskId: ---->String:",this.hashCode(),context.getThisTaskId(),s); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) //nothing to do
//通过查看日志得出,2个bolt都收到了spout的8条数据,也就是说,shuffle groupping的分组并没有起到作用,还是从spout中获取了8条数据,所以是将任务的副本全拷贝过来了
4.全局分组(Global grouping):
在多个bolt中对数据进行了一系列的操作,在最后一个bolt时需要对前面bolt操作的数据进行整合,这里就需要用global grouping 分组来进行整合。
确切的说,是分配给ID最小的那个task执行。
需求:随机将数据分发到doubleBolt上,在最后一个bolt上做整合操作
public class GlobalGroupingTopology private final static Logger log = LoggerFactory.getLogger(GlobalGroupingTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(), 1); builder.setBolt("NumberDoubleBolt", new NumberDoubleBolt(), 2).shuffleGrouping("NumberGenerateSpout"); builder.setBolt("NumberPrintBolt", new NumberPrintBolt(), 2).globalGrouping("NumberDoubleBolt"); Config config = new Config(); config.setNumWorkers(4); try StormSubmitter.submitTopology("GlobalGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.", "GlobalGroupingTopology"); log.warn("=================================================="); catch (Exception e) e.printStackTrace(); // ============================== spout ======================================= public class NumberGenerateSpout extends BaseRichSpout private SpoutOutputCollector collector; private AtomicInteger counter; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.counter = new AtomicInteger(0); @Override public void nextTuple() while(counter.get()< 10) collector.emit(new Values(counter.getAndIncrement())); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i"));
// =========================== bolt ==================================== public class NumberDoubleBolt extends BaseBasicBolt @Override public void execute(Tuple input, BasicOutputCollector collector) Integer value = input.getIntegerByField("i"); collector.emit(new Values(value*2,10)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("i","constant")); // ========================== bolt ==================================== public class NumberPrintBolt extends BaseBasicBolt private final static Logger logger = LoggerFactory.getLogger(NumberPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; logger.warn("============== perpare TaskID:",context.getThisTaskId()); @Override public void execute(Tuple input, BasicOutputCollector collector) Integer i = input.getIntegerByField("i"); Integer constant = input.getIntegerByField("constant"); logger.warn("taskID:,instantID:,i:,constant:",context.getThisTaskId(),this,i,constant); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do
5.直接分组(Direct grouping)
与global grouping 实现刚好相反的作用,这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
public class DirectGroupingTopology private static final Logger log = LoggerFactory.getLogger(DirectGroupingTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberGenerateSpout", new NumberGenerateSpout(),1); builder.setBolt("NumberDoubleBolt",new NumberDoubleBolt(),2).directGrouping("NumberGenerateSpout"); Config config = new Config(); config.setNumWorkers(3); try StormSubmitter.submitTopology("DirectGroupingTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","DirectGroupingTopology"); log.warn("=================================================="); catch (Exception e) e.printStackTrace();
// ============================= spout =============================== public class NumberGenerateSpout extends BaseRichSpout private SpoutOutputCollector collector; private AtomicInteger counter; private int destTaskID; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; counter = new AtomicInteger(0); List<Integer> tasks = context.getComponentTasks("NumberDoubleBolt"); destTaskID = tasks.stream().mapToInt(Integer::intValue).max().getAsInt(); @Override public void nextTuple() while(counter.get() < 10) collector.emitDirect(destTaskID,new Values(counter.getAndIncrement())); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // TODO Auto-generated method stub declarer.declare(true,new Fields("i"));
// =============================== bolt ============================================== public class NumberDoubleBolt extends BaseBasicBolt private static final Logger log = LoggerFactory.getLogger(NumberDoubleBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; log.warn(" =================== taskId:",context.getThisTaskId()); ; @Override public void execute(Tuple input, BasicOutputCollector collector) Integer value = input.getIntegerByField("i"); log.warn("taskID:,instanceID:,value:",context.getThisTaskId(),this,value); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do
6.本地或随机分组(local or shuffle grouping)
public class LocalLogTopology private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("LogSpout", new LogSpout(),1); builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout"); builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).localOrShuffleGrouping("LogSpout"); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); try LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalLogTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","LocalLogTopology"); log.warn("=================================================="); TimeUnit.SECONDS.sleep(120); cluster.killTopology("LocalLogToplogy"); cluster.shutdown(); catch (Exception e) e.printStackTrace(); // =============================== spout ============================== public class LogSpout extends BaseRichSpout private SpoutOutputCollector collector; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.index = 0; this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM", "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c"); @Override public void nextTuple() while(index < list.size()) collector.emit(new Values(list.get(index++))); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("entity")); //============================== bolt ===================================== public class LogParseBolt extends BaseBasicBolt @Override public void execute(Tuple input, BasicOutputCollector collector) String entity = input.getStringByField("entity"); List<String> list = Splitter.on(",").splitToList(entity); collector.emit(new Values(list.get(0),list.get(1))); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("category","item"));
//============================= bolt =========================================== public class LogPrintBolt extends BaseBasicBolt private static final Logger LOG = LoggerFactory.getLogger(LogPrintBolt.class); private TopologyContext context; @Override public void prepare(Map stormConf, TopologyContext context) this.context = context; LOG.info("=========================================="); LOG.info("prepare taskID:",context.getThisTaskId()); LOG.info("=========================================="); @Override public void execute(Tuple input, BasicOutputCollector collector) String category = input.getStringByField("category"); String item = input.getStringByField("item"); LOG.info("=========================================="); LOG.info("execute:category:,item:,taskID:",category,item,context.getThisTaskId()); LOG.info("=========================================="); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // nothing to do
7.无分组(None grouping)
你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
8.Custom grouping(自定义)
public class LocalLogTopology private static final Logger log = LoggerFactory.getLogger(LocalLogTopology.class); public static void main(String[] args) TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("LogSpout", new LogSpout(),1); builder.setBolt("LogParseBolt",new LogParseBolt(),1).localOrShuffleGrouping("LogSpout"); builder.setBolt("LogPrintBolt",new LogPrintBolt(),2).customGrouping("LogParseBolt", new HighTaskIDGrouping()); Config config = new Config(); config.setDebug(false); config.setNumWorkers(4); try LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalLogTopology", config, builder.createTopology()); log.warn("=================================================="); log.warn("the topology is submitted.","LocalLogTopology"); log.warn("=================================================="); TimeUnit.SECONDS.sleep(120); cluster.killTopology("LocalLogToplogy"); cluster.shutdown(); catch (Exception e) e.printStackTrace(); // =============================== spout ============================== public class LogSpout extends BaseRichSpout private SpoutOutputCollector collector; private List<String> list; private int index; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) this.collector = collector; this.index = 0; this.list = Arrays.asList("JAVA,COLLECTION","JAVA,IO","JAVA,THREAD","JAVA,LAMBDA","BIG_DATA,STORM", "BIG_DATA,KAFKA","BIG_DATA,HADDOP","BIG_DATA,FLUME","BIG_DATA,KAFKA","C,c"); @Override public void nextTuple() while(index < list.size()) collector.emit(new Values(list.get(index++))); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("entity")); //============================== bolt ===================================== public class LogParseBolt extends BaseBasicBolt @Override public void execute(Tuple input, BasicOutputCollector collector) String entity = input.getStringByField("entity"); List<String> list = Splitter.on(",").splitToList(entity); collector.emit(new Values(list.get(0),list.get(1))); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("category","item")); //============================= custom grouping =========================================== public class HighTaskIDGrouping implements CustomStreamGrouping private int taskID; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) //List<Integer> targetTasks: 下游所有的tasks的集合 ArrayList<Integer> tasks = new ArrayList<>(targetTasks); Collections.sort(tasks); //从小到大排列 this.taskID = tasks.get(tasks.size() -1); @Override public List<Integer> chooseTasks(int taskId, List<Object> values) return Arrays.asList(taskID);
以上是关于storm group 的介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章
Storm 第三章 Storm编程案例及Stream Grouping详解