集采单词
package wordcount; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * @ClassName: DataSourceSpout * @Description: TODO * @author cheng * @date 2017年12月12日 下午2:06:32 *收集数据源 */ public class DataSourceSpout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; /* (non-Javadoc) * 运行时调用一次,初始化 * conf获取配置参数, * collector:spout收集数据发送给blot */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector=collector; this.conf=conf; this.context=context; } // 读文件 public void nextTuple() { // 参数:目录位置,文件后缀,是否递归 Collection<File> files = FileUtils.listFiles(new File("F:/安装/java工程/StormTest/data"), new String[]{"txt"},true); // Collection<File> files = FileUtils.listFiles(new File("/home/neworigin/Desktop/data"), // new String[]{"txt"},true); // 遍历文件内容 for(File file:files) { //按行来读 try { // 将读取的每行存放在List中 List<String> lines = FileUtils.readLines(file); // 把一行数据发送出去 for(String line:lines) { this.collector.emit(new Values(line)); } Thread.sleep(1000); // 防止文件被重复读,将读过的文件改名 FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis())); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //声明输出的内容,告知blot,spout输出的内容 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line"));//通过字段名称来获取字段内容 } @Override public void close() { System.out.println("read close"); } }
处理单词(按空格拆分)
package wordcount; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * @ClassName: SplitBolt * @Description: TODO * @author cheng * @date 2017年12月12日 下午2:28:25 *切分采集的数据 */ public class SplitBolt extends BaseRichBolt{ private Map conf; private TopologyContext context; private OutputCollector collector; //调用一次,进行初始化 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector=collector; this.context=context; this.conf=conf; } // 处理数据 public void execute(Tuple input) { // 根据字段获取每一行数据 String line=input.getStringByField("line"); String[] words = line.split(" "); for(String word:words ) { //传递给下一个bolt this.collector.emit(new Values(word)); } } //声明输出的内容,告知下一个bolt这里输出的是什么 // 如果是最后一个bolt就不用声明字段名称 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("words")); } }
对单词计数
package wordcount; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class CountBolt extends BaseRichBolt{ private Map conf; private TopologyContext context; private OutputCollector collector; //调用一次,进行初始化 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector=collector; this.context=context; this.conf=conf; } int testnum=0; int i=0; HashMap<String,Integer> hashmap=new HashMap<String,Integer>(); public void execute(Tuple input) { // 获取每一个单词 String word = input.getStringByField("words"); // 对所有单词进行汇总 Integer num = hashmap.get(word);//根据key来获取value if(num==null) { num=0; } num++; hashmap.put(word, num); // System.out.println("---------"+testnum++); // 遍历map,输出键值对 System.out.println("执行countBolt的exec"+i++); for(Map.Entry<String, Integer> entry:hashmap.entrySet()) { System.out.println("word="+entry.getKey()+":number="+entry.getValue()+"---------"+testnum++); } } //最后一个bolt,不用声明 public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
联合运行
package wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import clojure.main; public class WordCount { public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("spout_id", new DataSourceSpout(),4);//默认只有一个,给个4设置并发度(多线程执行),还以在最后面设置numtask(task的数量,默认每个executor执行已task) builder.setBolt("bolt_1", new SplitBolt()).shuffleGrouping("spout_id");//因为有多个bolt需要指定上一个步骤 BoltDeclarer declarer = builder.setBolt("bolt_2", new CountBolt()).fieldsGrouping("bolt_1",new Fields("words"));//指定上一个bolt并按words相同的bolt分到同一个task // 设置在本地运行 LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setNumWorkers(2);//设置worker的数量 cluster.submitTopology("topology", config, builder.createTopology()); } }