storm实战之WordCount

Posted songweideboke

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm实战之WordCount相关的知识,希望对你有一定的参考价值。

 


一,环境搭建

  eclipse的项目的创键和jar包的导入。

二,代码编写

  1,组件spout的代码编写,用来发射数据源。

package com;

import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
   //用来收集spout的输出tuple
	private SpoutOutputCollector Collector;
	//private Random rand;
	private static final  long SrialversionUID=1l; 
	
	@Override
	public void nextTuple() {
//	String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
//	Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
		String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
		Values values=new Values(datas[0]);
            //发射的数据
		Collector.emit(values);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	//初始化操作,只执行一遍
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
		this.Collector=Collector;
	}
        //为发射的数据添加唯一标识,
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("spout"));	
	}	
}

  2,bolt组件的代码编写,用来切割字段。

package com;

import java.util.Map;
import java.util.Random;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class RandomSentenceSpout extends BaseRichSpout{
   //用来收集spout的输出tuple
	private SpoutOutputCollector Collector;
	//private Random rand;
	private static final  long SrialversionUID=1l; 
	
	@Override
	public void nextTuple() {
//	String[] data={"hello zhangsan","nice to meet","you zhangsan hello","lisi welcome to bj"};
//	Collector.emit(new Values(data[rand.nextInt(data.length-1)]));
		String[] datas= {"hello zhangsan nice to meet you zhangsan hello lisi welcome to bj"};
		Values values=new Values(datas[0]);
		Collector.emit(values);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	
	}

	//初始化操作,只执行一遍
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector Collector ) {
		this.Collector=Collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("spout"));
		
	}
	
}

  3,bolt组件的代码编写,用来统计字段的数量。

package com;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCount extends BaseRichBolt{

	private static final Long SrialversionUID=1l;
	private OutputCollector collector;
	Map<String,Integer>map=new HashMap<String,Integer>();
	@Override
	public void execute(Tuple value) {
		String data = value.getStringByField("word");
		if(map.containsKey(data)){
			map.put(data, map.get(data)+1);
		}else{
			map.put(data,1);
		}
		 System.out.println(map);
	}

	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
		this.collector=collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer d) {
		//d.declare(new Fields("words","int"));
	}
}

  4,编写提交类

package com;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class mian {

	public static void main(String[] args) {
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout("spout", new RandomSentenceSpout());
		topologyBuilder.setBolt("wordBolt", new WordBolt()).shuffleGrouping("spout");
		topologyBuilder.setBolt("wordint", new WordCount()).fieldsGrouping("wordBolt", new Fields("word"));
		Config config = new Config();
		if(args==null||args.length==0){
              //集群模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount",config ,topologyBuilder.createTopology()); }else{
              //单机模式 config.setNumWorkers(1); try { StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }

  5,打成jar包,上传到服务器运行。注意只打主类的class,不要连带项目中的jar一起打入。否则在集群上面会报错。

 

 


 



以上是关于storm实战之WordCount的主要内容,如果未能解决你的问题,请参考以下文章

Storm集群上的开发 ,Topology任务的编写 之 WordCount Spout和Blot的分组策略(一张图说明问题)

Storm编程之wordcount(kafka--》Jstorm--》redis)

Storm编程之wordcount(kafka--》Jstorm--》redis)

Storm入门WordCount示例

Storm常用操作命令及WordCount

storm wordcount实例