storm的流分组

Posted 百里登风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm的流分组相关的知识,希望对你有一定的参考价值。

 

用的是ShuffleGrouping分组方式,并行度设置为3

 

这是跑下来的结果

 

 

参考代码StormTopologyShufferGrouping.java


package yehua.storm;


import java.util.Map;


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


/**
* shufferGrouping
* 没有特殊情况下,就使用这个分组方式,可以保证负载均衡,工作中最常用的
* @author yehua
*
*/


public class StormTopologyShufferGrouping {

public static class MySpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
// @Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}


int num = 0;
//@Override
public void nextTuple() {
num++;
System.out.println("spout:"+num);
this.collector.emit(new Values(num));
Utils.sleep(1000);
}


//@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}

}



public static class MyBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
// @Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}

//@Override
public void execute(Tuple input) {
Integer num = input.getIntegerByField("num");
System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
}


//@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}



public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
String spout_id = MySpout.class.getSimpleName();
String bolt_id = MyBolt.class.getSimpleName();

topologyBuilder.setSpout(spout_id, new MySpout());
topologyBuilder.setBolt(bolt_id, new MyBolt(),3).shuffleGrouping(spout_id);


Config config = new Config();
String topology_name = StormTopologyShufferGrouping.class.getSimpleName();
if(args.length==0){
//在本地运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
}else{
//在集群运行
try {
StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}

}


}

 

 

 

 

用fieldsGrouping方法

 

 

按奇偶数分组(也就是按字段分组)

 

 

从跑出来的结果看出来,一个线程处理奇数的一个线程处理偶数的

 

 参考代码StormTopologyFieldsGrouping.java

  1 package yehua.storm;
  2 
  3 import java.util.Map;
  4 
  5 import org.apache.storm.Config;
  6 import org.apache.storm.LocalCluster;
  7 import org.apache.storm.StormSubmitter;
  8 import org.apache.storm.generated.AlreadyAliveException;
  9 import org.apache.storm.generated.AuthorizationException;
 10 import org.apache.storm.generated.InvalidTopologyException;
 11 import org.apache.storm.spout.SpoutOutputCollector;
 12 import org.apache.storm.task.OutputCollector;
 13 import org.apache.storm.task.TopologyContext;
 14 import org.apache.storm.topology.OutputFieldsDeclarer;
 15 import org.apache.storm.topology.TopologyBuilder;
 16 import org.apache.storm.topology.base.BaseRichBolt;
 17 import org.apache.storm.topology.base.BaseRichSpout;
 18 import org.apache.storm.tuple.Fields;
 19 import org.apache.storm.tuple.Tuple;
 20 import org.apache.storm.tuple.Values;
 21 import org.apache.storm.utils.Utils;
 22 
 23 /**
 24  * FieldsGrouping
 25  * 字段分组
 26  * @author yehua
 27  *
 28  */
 29 
 30 public class StormTopologyFieldsGrouping {
 31     
 32     public static class MySpout extends BaseRichSpout{
 33         private Map conf;
 34         private TopologyContext context;
 35         private SpoutOutputCollector collector;
 36         //@Override
 37         public void open(Map conf, TopologyContext context,
 38                 SpoutOutputCollector collector) {
 39             this.conf = conf;
 40             this.collector = collector;
 41             this.context = context;
 42         }
 43 
 44         int num = 0; 
 45         //@Override
 46         public void nextTuple() {
 47             num++;
 48             System.out.println("spout:"+num);
 49             this.collector.emit(new Values(num,num%2));
 50             Utils.sleep(1000);
 51         }
 52 
 53         //@Override
 54         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 55             declarer.declare(new Fields("num","flag"));
 56         }
 57         
 58     }
 59     
 60     
 61     
 62     public static class MyBolt extends BaseRichBolt{
 63         
 64         private Map stormConf;
 65         private TopologyContext context;
 66         private OutputCollector collector;
 67         //@Override
 68         public void prepare(Map stormConf, TopologyContext context,
 69                 OutputCollector collector) {
 70             this.stormConf = stormConf;
 71             this.context = context;
 72             this.collector = collector;
 73         }
 74         
 75         //@Override
 76         public void execute(Tuple input) {
 77             Integer num = input.getIntegerByField("num");
 78             System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
 79         }
 80 
 81         //@Override
 82         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 83             
 84         }
 85         
 86     }
 87     
 88     
 89     
 90     public static void main(String[] args) {
 91         TopologyBuilder topologyBuilder = new TopologyBuilder();
 92         String spout_id = MySpout.class.getSimpleName();
 93         String bolt_id = MyBolt.class.getSimpleName();
 94         
 95         topologyBuilder.setSpout(spout_id, new MySpout());
 96         //注意:字段分组一定可以保证相同分组的数据进入同一个线程处理
 97         topologyBuilder.setBolt(bolt_id, new MyBolt(),2).fieldsGrouping(spout_id, new Fields("flag"));
 98         
 99         
100         Config config = new Config();
101         String topology_name = StormTopologyFieldsGrouping.class.getSimpleName();
102         if(args.length==0){
103             //在本地运行
104             LocalCluster localCluster = new LocalCluster();
105             localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
106         }else{
107             //在集群运行
108             try {
109                 StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
110             } catch (AlreadyAliveException e) {
111                 e.printStackTrace();
112             } catch (InvalidTopologyException e) {
113                 e.printStackTrace();
114             } catch (AuthorizationException e) {
115                 e.printStackTrace();
116             }
117         }
118         
119     }
120 
121 }

 这里补充一下,比如说有两类数据3个线程的时候

我们再看看运行结果,发现只有两个线程干活了

还有一种情况,只有一个线程的情况,还是两类数据

从运行结果看出来,所有话一个进程干完了

 

 

 

 allGrouping方法

 

运行结果:spout每发一条数据三个进程都接收到了(基本没什么应用场景)

 

 参考代码StormTopologyAllGrouping.java

  1 package yehua.storm;
  2 
  3 import java.util.Map;
  4 
  5 import org.apache.storm.Config;
  6 import org.apache.storm.LocalCluster;
  7 import org.apache.storm.StormSubmitter;
  8 import org.apache.storm.generated.AlreadyAliveException;
  9 import org.apache.storm.generated.AuthorizationException;
 10 import org.apache.storm.generated.InvalidTopologyException;
 11 import org.apache.storm.spout.SpoutOutputCollector;
 12 import org.apache.storm.task.OutputCollector;
 13 import org.apache.storm.task.TopologyContext;
 14 import org.apache.storm.topology.OutputFieldsDeclarer;
 15 import org.apache.storm.topology.TopologyBuilder;
 16 import org.apache.storm.topology.base.BaseRichBolt;
 17 import org.apache.storm.topology.base.BaseRichSpout;
 18 import org.apache.storm.tuple.Fields;
 19 import org.apache.storm.tuple.Tuple;
 20 import org.apache.storm.tuple.Values;
 21 import org.apache.storm.utils.Utils;
 22 
 23 /**
 24  * AllGrouping
 25  * 广播分组
 26  * @author yehua
 27  *
 28  */
 29 
 30 public class StormTopologyAllGrouping {
 31     
 32     public static class MySpout extends BaseRichSpout{
 33         private Map conf;
 34         private TopologyContext context;
 35         private SpoutOutputCollector collector;
 36         //@Override
 37         public void open(Map conf, TopologyContext context,
 38                 SpoutOutputCollector collector) {
 39             this.conf = conf;
 40             this.collector = collector;
 41             this.context = context;
 42         }
 43 
 44         int num = 0; 
 45         //@Override
 46         public void nextTuple() {
 47             num++;
 48             System.out.println("spout:"+num);
 49             this.collector.emit(new Values(num));
 50             Utils.sleep(1000);
 51         }
 52 
 53         //@Override
 54         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 55             declarer.declare(new Fields("num"));
 56         }
 57         
 58     }
 59     
 60     
 61     
 62     public static class MyBolt extends BaseRichBolt{
 63         
 64         private Map stormConf;
 65         private TopologyContext context;
 66         private OutputCollector collector;
 67         //@Override
 68         public void prepare(Map stormConf, TopologyContext context,
 69                 OutputCollector collector) {
 70             this.stormConf = stormConf;
 71             this.context = context;
 72             this.collector = collector;
 73         }
 74         
 75         //@Override
 76         public void execute(Tuple input) {
 77             Integer num = input.getIntegerByField("num");
 78             System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
 79         }
 80 
 81         //@Override
 82         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 83             
 84         }
 85         
 86     }
 87     
 88     
 89     
 90     public static void main(String[] args) {
 91         TopologyBuilder topologyBuilder = new TopologyBuilder();
 92         String spout_id = MySpout.class.getSimpleName();
 93         String bolt_id = MyBolt.class.getSimpleName();
 94         
 95         topologyBuilder.setSpout(spout_id, new MySpout());
 96         topologyBuilder.setBolt(bolt_id, new MyBolt(),3).allGrouping(spout_id);
 97         
 98         
 99         Config config = new Config();
100         String topology_name = StormTopologyAllGrouping.class.getSimpleName();
101         if(args.length==0){
102             //在本地运行
103             LocalCluster localCluster = new LocalCluster();
104             localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
105         }else{
106             //在集群运行
107             try {
108                 StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
109             } catch (AlreadyAliveException e) {
110                 e.printStackTrace();
111             } catch (InvalidTopologyException e) {
112                 e.printStackTrace();
113             } catch (AuthorizationException e) {
114                 e.printStackTrace();
115             }
116         }
117         
118     }
119 
120 }

 

 

 

LocalOrShufferGrouping方法

spout只会给同一个主机的线程发送数据(图中的线程1),也就是在同一个线程里会被发送数据,这样做的好处就是在同一个进程里发送数据效率搞,不用跨主机传输

但是当数据量太大的时候,线程1处理不了的时候就麻烦了,所以在实际工作中不建议这样做。

 

 

这里用的是3个线程(3个bolt),2个进程(2个worker)

 

从运行的结果我们可以看出来,只有一个线程在接收数据

 

 

还有一种情况,如果本地没有线程的时候,他就跟ShufferGrouping的效果一样的

参考代码StormTopologyLocalOrShufferGrouping.java

  1 package yehua.storm;
  2 
  3 import java.util.Map;
  4 
  5 import org.apache.storm.Config;
  6 import org.apache.storm.LocalCluster;
  7 import org.apache.storm.StormSubmitter;
  8 import org.apache.storm.generated.AlreadyAliveException;
  9 import org.apache.storm.generated.AuthorizationException;
 10 import org.apache.storm.generated.InvalidTopologyException;
 11 import org.apache.storm.spout.SpoutOutputCollector;
 12 import org.apache.storm.task.OutputCollector;
 13 import org.apache.storm.task.TopologyContext;
 14 import org.apache.storm.topology.OutputFieldsDeclarer;
 15 import org.apache.storm.topology.TopologyBuilder;
 16 import org.apache.storm.topology.base.BaseRichBolt;
 17 import org.apache.storm.topology.base.BaseRichSpout;
 18 import org.apache.storm.tuple.Fields;
 19 import org.apache.storm.tuple.Tuple;
 20 import org.apache.storm.tuple.Values;
 21 import org.apache.storm.utils.Utils;
 22 
 23 /**
 24  * LocalAllshufferGrouping
 25  * @author yehua
 26  *
 27  */
 28 
 29 public class StormTopologyLocalOrShufferGrouping {
 30     
 31     public static class MySpout extends BaseRichSpout{
 32         private Map conf;
 33         private TopologyContext context;
 34         private SpoutOutputCollector collector;
 35         //@Override
 36         public void open(Map conf, TopologyContext context,
 37                 SpoutOutputCollector collector) {
 38             this.conf = conf;
 39             this.collector = collector;
 40             this.context = context;
 41         }
 42 
 43         int num = 0; 
 44         //@Override
 45         public void nextTuple() {
 46             num++;
 47             System.out.println("spout:"+num);
 48             this.collector.emit(new Values(num));
 49             Utils.sleep(1000);
 50         }
 51 
 52         //@Override
 53         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 54             declarer.declare(new Fields("num"));
 55         }
 56         
 57     }
 58     
 59     
 60     
 61     public static class MyBolt extends BaseRichBolt{
 62         
 63         private Map stormConf;
 64         private TopologyContext context;
 65         private OutputCollector collector;
 66         //@Override
 67         public void prepare(Map stormConf, TopologyContext context,
 68                 OutputCollector collector) {
 69             this.stormConf = stormConf;
 70             this.context = context;
 71             this.collector = collector;
 72         }
 73         
 74         //@Override
 75         public void execute(Tuple input) {
 76             Integer num = input.getIntegerByField("num");
 77             System.err.println("thread:"+Thread.currentThread().getId()+",num="+num);
 78         }
 79 
 80         //@Override
 81         public void declareOutputFields(OutputFieldsDeclarer declarer) {
 82             
 83         }
 84         
 85     }
 86     
 87     
 88     
 89     public static void main(String[] args) {
 90         TopologyBuilder topologyBuilder = new TopologyBuilder();
 91         String spout_id = MySpout.class.getSimpleName();
 92         String bolt_id = MyBolt.class.getSimpleName();
 93         
 94         topologyBuilder.setSpout(spout_id, new MySpout());
 95         topologyBuilder.setBolt(bolt_id, new MyBolt(),3).localOrShuffleGrouping(spout_id);
 96         
 97         
 98         Config config = new Config();
 99         config.setNumWorkers(2);
100         String topology_name = StormTopologyLocalOrShufferGrouping.class.getSimpleName();
101         if(args.length==0){
102             //在本地运行
103             LocalCluster localCluster = new LocalCluster();
104             localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
105         }else{
106             //在集群运行
107             try {
108                 StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
109             } catch (AlreadyAliveException e) {
110                 e.printStackTrace();
111             } catch (InvalidTopologyException e) {
112                 e.printStackTrace();
113             } catch (AuthorizationException e) {
114                 e.printStackTrace();
115             }
116         }
117         
118     }
119 
120 }

 

以上是关于storm的流分组的主要内容,如果未能解决你的问题,请参考以下文章

Twitter的流处理器系统Heron——升级的storm,可以利用mesos来进行资源调度

Storm---DirectGroup(直接分组)

jstorm之于storm

storm group 的介绍与使用

第1节 storm编程:8storm的分发策略

Storm笔记整理:Storm核心概念与验证——并行度与流式分组