storm+kafka:WordCount程序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm+kafka:WordCount程序相关的知识,希望对你有一定的参考价值。
简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic。 首先规划需要用到的类: 从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类; 数据切分bolt:SplitSentenceBolt; 计数bolt:WordCountBolt; 报表bolt:ReportBolt; topology定义:WordCountTopology; 最后再加一个原样显示订阅数据的bolt:SentenceBolt。 backtype.storm.spout.Scheme子类可以使用上面已经定义过的MessageScheme,此处不再赘述。
SplitSentenceBolt是对输入数据进行分割,简单的使用String类的split方法,然后将每个单词命名为“word”,向后传输,代码如下:
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Arrays; public class SplitSentenceBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getStringByField("msg"); String[] words = sentence.split(" "); Arrays.asList(words).forEach(word -> collector.emit(new Values(word))); } }
SentenceBolt是从KafkaSpout接收数据,然后直接输出。在拓扑图上就是从输入分叉,一个进入SplitSentenceBolt,一个进入SentenceBolt。这种结构可以应用在Lambda架构中,代码如下:
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SentenceBolt extends BaseBasicBolt { private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class); @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String msg = tuple.getStringByField("msg"); logger.info("get one message is {}", msg); basicOutputCollector.emit(new Values(msg)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } }
Backtype.storm.spout.SchemeAsMultiScheme的构造方法输入的参数是订阅kafka数据的处理参数,这里的MessageScheme是自定义的,代码如下:
import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.util.List; public class MessageScheme implements Scheme { private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class); @Override public List<Object> deserialize(byte[] ser) { try { String msg = new String(ser, "UTF-8"); logger.info("get one message is {}", msg); return new Values(msg); } catch (UnsupportedEncodingException ignored) { return null; } } @Override public Fields getOutputFields() { return new Fields("msg"); } }
WordCountBolt是对接收到的单词进行汇总统一,然后将单词“word”及其对应数量“count”向后传输,代码如下:
import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class WordCountBolt extends BaseBasicBolt { private Map<String, Long> counts = null; @Override public void prepare(Map stormConf, TopologyContext context) { this.counts = new ConcurrentHashMap<>(); super.prepare(stormConf, context); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); } @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); collector.emit(new Values(word, count)); } }
ReportBolt是对接收到的单词及数量进行整理,拼成json格式,然后继续向后传输,代码如下:
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ReportBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String word = input.getStringByField("word"); Long count = input.getLongByField("count"); String reportMessage = "{‘word‘: ‘" + word + "‘, ‘count‘: ‘" + count + "‘}"; collector.emit(new Values(reportMessage)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("message")); } }
最后是定义topology(拓扑)WordCountTopology,代码如下:
除了上面提过应该注意的地方,此处还需要注意,storm.kafka.SpoutConfig定义的zkRoot与id应该与第一个例子中不同(至少保证id不同,否则两个topology将使用一个节点记录偏移量)。
以上是关于storm+kafka:WordCount程序的主要内容,如果未能解决你的问题,请参考以下文章
Storm编程之wordcount(kafka--》Jstorm--》redis)