JStorm与Storm源码分析--BasicBoltExecutor与装饰模式

Posted 脑机接口社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JStorm与Storm源码分析--BasicBoltExecutor与装饰模式相关的知识,希望对你有一定的参考价值。

在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack、Fail和Anchor操作,而这部分操作是由执行器BasicBoltExecutor 实现的。 
下面我们看一下BasicBoltExecutor的源码:

/**
 * BasicBoltExecutor实现了IRichBolt接口
 * 在该类中持有一个IBasicBolt成员变量用于调用转发
 * 说明:
 * 该类是基于装饰模式实现的.
 */
public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
    //持有IBasicBolt类型的变量
    private IBasicBolt _bolt;
    //定义了成员变量_collector
    private transient BasicOutputCollector _collector;

    public BasicBoltExecutor(IBasicBolt bolt) {
        _bolt = bolt;
    }
    /**
     * 实现declareOutputFields方法,但它实际上是
     * _bolt调用declareOutputFields方法
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);
    }

    /**
     * 实现prepare方法,
     * 实际上是调用_bolt的prepare方法,
     * 并实例化BasicOutputCollector
     */
    public void prepare(Map stormConf, TopologyContext context, 
            OutputCollector collector) {
        _bolt.prepare(stormConf, context);
        _collector = new BasicOutputCollector(collector);
    }
    /**
     * 实现execute方法
     */
    public void execute(Tuple input) {
        //设置运行器上下文,
        //它表示经execute方法发送出去的消息都是由输入消息产生的,
        //即输出的消息都将标记为输入消息所衍生出来的消息,
        //这是使用IBasicBolt实现消息跟踪的重要一环
        _collector.setContext(input);
        try {
            //调用_bolt的execute方法
            _bolt.execute(input, _collector);
            //对输入的消息进行Ack操作.
            //这一步意味着基于当前输入消息的处理和衍生消息的发送已经完成,
            //这时就可以对该消息进行Ack操作了.
            _collector.getOutputter().ack(input);
        } catch(FailedException e) {
            //Storm捕获所有的FailedException,并对输入的消息进行Fail操作。
            //如果捕获的异常为ReportedFailedException的实例,
            //则调用reportError回调方法,给用户一个机会去处理异常
            //FailedException是Storm定义的一种基本异常,用来进行消息的失败重发等操作,
            //并不会导致Topology运行停止
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);
        }
    }
    public void cleanup() {
        _bolt.cleanup();
    }
    public Map<String, Object> getComponentConfiguration() {
        return _bolt.getComponentConfiguration();
    }
}

先说说装饰模式的结构,装饰模式UML类图如图所示: 

如上图所示,装饰模式的角色构成: 
(1)抽象组件角色-Component:给出一个抽象接口,以规范或约束准备接收附加职责的对象. 
(2)具体组件对象-ConcreteComponent:实现了组件对象的接口,通常是被装饰器装饰的原始对象. 
(3)装饰角色-Decorator:所有装饰器的抽象父类,需要定义一个与组件接口一致的接口, 
并持有一个Component对象,其实就是持有被装饰的对象 
(4)具体装饰对象-ConcreteDecorator:实际的装饰器对象,负责给组件对象添加附加的功能

现在举一个常见的例子(冲咖啡)来说明一下。 
我敲代码困了,想冲一杯咖啡来提提神,但我怕咖啡苦,便向咖啡里加点糖(用糖装饰一番),喝着喝着觉得没有牛奶香,边往咖啡里到点牛奶(用牛奶装饰咖啡一番),这整个过程便可看成装饰模式. 
代码: 
1.先定义一个接口用来约束职责对象

/**
 * Component  
 * 抽象构建角色,
 * 约束或规范准备接收附加责任的对象
 */
public interface Component {
    public void operation();
}

2.实现组件对象的接口,它是被装饰器装饰的原始对象(这里是咖啡)

/**
 * ConcreteComponent  
 * 接收附加责任
 */
public class ConcreteComponent implements Component {

    @Override
    public void operation() {
        System.out.println("的咖啡");
    }
}

3.定义装饰器

/**
 * Decorator装饰角色 
 */
public abstract class Decorator implements Component {
    //持有一个构建对象的实例
    private Component component;
    public Decorator(Component component){
        this.component=component;
    }
    @Override
    public void operation() {
        component.operation();
    }
}

4.实现加糖装饰器和加牛奶装饰器

/** 
 * ConcreteDecorator1
 * 加糖装饰器
 */
public class ConcreteDecorator1 extends Decorator {

    public ConcreteDecorator1(Component component) {
        super(component);
        // TODO Auto-generated constructor stub
    }
    public void operation(){
        System.out.print("加糖");
        super.operation();
    }
}
/**
 * ConcreteDecorator2   
 * 加奶装饰器
 */
public class ConcreteDecorator2 extends Decorator {

    public ConcreteDecorator2(Component component) {
        super(component);
        // TODO Auto-generated constructor stub
    }
    public void operation(){
        System.out.print("加奶");
        super.operation();
    }
}

5.测试

public class Test {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Component component=new ConcreteComponent();
        Decorator decorator1=new ConcreteDecorator2(component);
        Decorator decorator2=new ConcreteDecorator1(decorator1);
        decorator2.operation();
    }
}

6.测试结果

加糖加奶的咖啡

jdk中也大量使用装饰模式。比如Java流接口中输出流部分 
OutputStream就相当于装饰模式中的Component; 
FileOutputStream、ObjectOutputStream这几个对象直接继承了OutputStream, 
还有一些对象直接继承OutputStream对象,比如:ByteArrayOutputStream、PipedOutputStream等.这些对象相当于装饰模式中的ConcreteComponent,是可以被装饰器装饰的对象. 
FilterOutputStream相当于装饰模式中的Decorator,而他的子类DataOutputStream、BufferedOutputStream就相当于装饰模式中的ConcreteDecorator。FilterOutputStream和它的子类对象的构造器都是传入组件OutputStream。对照上述将的装饰模式的结构图,由此可见这部分类的设计完全采用装饰模式. 
同理输入流部分也类似。

注:内容部分为学习李明老师Storm源码分析的笔记整理。 
欢迎关注下面二维码进行技术交流:

 

以上是关于JStorm与Storm源码分析--BasicBoltExecutor与装饰模式的主要内容,如果未能解决你的问题,请参考以下文章

JStorm与Storm源码分析--BasicBoltExecutor与装饰模式

JStorm与Storm源码分析--SpoutOutputCollector与代理模式

Storm/JStorm之TopologyBuilder源码阅读

jstorm之于storm

JStorm Storm 上手demo

海数据技术沙龙——Flink:新一代流式计算框架&Storm/JStorm: 流式计算框架的应用