Storm框架:如何根据业务条件选择不同的bolt进行下发消息

Posted gouyg

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm框架:如何根据业务条件选择不同的bolt进行下发消息相关的知识,希望对你有一定的参考价值。

Strom框架基本概念就不提了,这里主要讲的是Stream自定义ID的消息流。默认spout、bolt都需实现接口方法declareOutputFields,代码如下:

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("body"));
}

这种情况下发的消息会被所有定义的bolts接收。我们如果需要根据得到的消息类型来选择不同的bolt,就需要用到Stream Grouping。

技术分享图片

  • 首先通过消息源的OutputFieldsDeclarer来定义发射多条消息流stream

以下定义了两种stream消息流:email邮件、sms短信

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream("email", new Fields("body"));
    outputFieldsDeclarer.declareStream("sms", new Fields("body"));
}
  • 然后我们通过对消息内容进行分析判断来决定发射指定的stream类型
@Override
public void execute(Tuple tuple) {
    String streamType;
    String value = tuple.getStringByField("body");
    # 逻辑判断stub code
    if (value.startsWith("email:")) {
        streamType = "email";
    } else {
        streamType = "sms";
    }
    
    outputCollector.emit(streamType, new Values(value));
}
  • topology设置bolt的消息源时通过localOrShuffleGrouping来设置只接收指定stream的消息

FilterBolt通过对消息进行加工处理,下发给bolts时会指定不同的stream,EmailNotifyBolt只接收email类型的stream消息,SmsNotifyBolt只接收sms类型的stream消息。

TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("RabbitmqSpout", new RabbitmqSpout());
topologyBuilder.setBolt("FilterBolt", new FilterBolt()).shuffleGrouping("RabbitmqSpout");

topologyBuilder.setBolt("EmailNotifyBolt", new EmailNotifyBolt()).localOrShuffleGrouping("FilterBolt", "email");

topologyBuilder.setBolt("SmsNotifyBolt", new SmsNotifyBolt()).localOrShuffleGrouping("FilterBolt", "sms");

以上是关于Storm框架:如何根据业务条件选择不同的bolt进行下发消息的主要内容,如果未能解决你的问题,请参考以下文章

storm基础概念

spout和bolt

Storm框架:Storm整合springboot

在storm中,我可以指定一个bolt将运行的工人数量吗?

storm在windows系统下安装调试

马化腾漫谈“流式大数据处理的三种框架:Storm,Spark和Samza”