JStorm与Storm源码分析--SpoutOutputCollector与代理模式

Posted 脑机接口社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JStorm与Storm源码分析--SpoutOutputCollector与代理模式相关的知识,希望对你有一定的参考价值。

本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式。 
首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明了以下3个抽象方法用来约束ISpoutOutputCollector的实现类。接口定义与方法说明如下:

/**
 * ISpoutOutputCollector:Spout输出收集器接口
 */
public interface ISpoutOutputCollector {
    /**
     * 改方法用来向外发送数据,它的返回值是该消息所有发送目标的taskID集合;
     * 参数:
     * streamId:消息Tuple将要被输出到的流
     * tuple:要输出的消息,是一个Object列表
     * messageId:输出消息的标记信息,如果messageId被设置为null,则Storm不会追踪该消息,
     * 否则它会被用来追踪所发出的消息处理情况
     */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    /**
     * 该方法与上面emit方法类似,区别在于:
     * 1.数据(消息)只由所指定taskId的Task接收;(这就意味着如果没有下游节点接收该消息,则该消息就没有被真正发送)
     * 2.该方法要求参数streamId所对应的流必须为直接流,接收端的Task必须以直接分组的方式来接收消息,
     * 否则会抛出异常.
     */
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    /**
     * 用来处理异常
     */
    void reportError(Throwable error);
}

Storm提供了接口ISpoutOutputCollector的默认类SpoutOutputCollector,这个类实际上是一个代理类,该类持有一个ISpoutOutputCollector类型的对象,所有的操作实际上都过该对象来实现的。SpoutOutputCollector定义如下:

public class SpoutOutputCollector implements ISpoutOutputCollector {
    /**
     * 持有SpoutOutputCollector要代理的对象
     */
    ISpoutOutputCollector _delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        _delegate = delegate;
    }
    /**
     * 实现了接口中的emit方法,并且提供了它的几个重载方法
     * eg.如果不指定streamId,默认使用default,如果不指定messageId,则默认使用空(null)
     */
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId){
        return _delegate.emit(streamId, tuple, messageId);
    }

    public List<Integer> emit(List<Object> tuple, Object messageId) {
        return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

    public List<Integer> emit(List<Object> tuple) {
        return emit(tuple, null);
    }

    public List<Integer> emit(String streamId, List<Object> tuple) {
        return emit(streamId, tuple, null);
    }
    /**
     * 实现了接口中的emitDirect方法,同时也提供了几个重载方法,与上面emit方法一致.
     */
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        _delegate.emitDirect(taskId, streamId, tuple, messageId);
    }

    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }

    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
        emitDirect(taskId, streamId, tuple, null);
    }

    public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, tuple, null);
    }
    /**
     * 处理异常方法的实现
     */
    @Override
    public void reportError(Throwable error) {
        _delegate.reportError(error);
    }
}

PS: 
代理模式主要分为两种:静态代理和动态代理

静态代理: 
在程序运行前代理类与委托类的关系在运行前就确定,即在程序运行前就已经存在代理类的字节码文件了. 
代理模式角色: 
Subject(抽象主题角色):可以是抽象类也可以是接口,声明了被委托角色和委托类共有的处理方法; 
RealSubject(具体主题角色):又称被委托角色、被代理角色,是业务逻辑的具体执行者; 
ProxySubject(代理主题角色):又称委托类、代理类,负责对真实角色的应用, 
把所有抽象主题类定义的方法限制委托给具体主题角色来实现,并且在具体主题角色处理完毕前后做预处理和善后处理.

静态代理模式案例如下:

//抽象主题
public interface Subject {
    public void process(String taskName);
}

被代理角色:

public class RealSubject implements Subject {
    @Override
    public void process(String taskName) {
        System.out.println("正在执行任务:"+taskName);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

代理类:

public class ProxySubject implements Subject {
    //代理类持有一个委托类的对象引用
    private Subject delegate;
    public ProxySubject(Subject delegate){
        this.delegate=delegate;
    }
    @Override
    public void process(String taskName) {
        //预处理
        this.before();
        //将请求分派给委托类处理
        delegate.process(taskName);
        //善后处理
        this.after();
    }
    private void before(){
        System.out.println("预处理!");
    }
    private void after(){
        System.out.println("善后处理!");
    }
}

案例测试:

public class Test {
    public static void main(String[] args) {
        RealSubject subject = new RealSubject();
        ProxySubject p = new ProxySubject(subject);
        p.process("排水");
    }
}

测试结果:

预处理!
正在执行任务:排水
善后处理!

静态代理类的优缺点: 
优点: 
业务类只需关注业务逻辑本身,这样就保证了业务类的重用性. 
缺点: 
代理对象的一个接口只服务于一种类型的对象.当要代理的方法很多,就要为每一种方法进行代理。因此静态代理在程序规模变大时就无法很好地胜任工作了.

动态代理: 
代理类和委托类的关系在程序运行时才确定的.动态代理类的源码是在程序运行期间由JVM根据反射等机制动态生成,所以不存在代理类的字节码文件.

动态代理模式案例如下:

public interface Service {
    //目标方法 
    public void process();
}
public class UserServiceImpl implements Service {
    @Override
    public void process() {
         System.out.println("用户service处理");  
    }
}

动态代理实现实例:

public class MyInvocatioHandler implements InvocationHandler {
    private Object target;

    public MyInvocatioHandler(Object target) {
        this.target = target;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        //System.out.println("-----before-----");
        this.before();
        Object result = method.invoke(target, args);
       // System.out.println("-----end-----");
        this.after();
        return result;
    }
    // 生成代理对象
    public Object getProxy() {
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        Class<?>[] interfaces = target.getClass().getInterfaces();
        return Proxy.newProxyInstance(loader, interfaces, this);
    }

    private void before(){
        System.out.println("预处理!");
    }
    private void after(){
        System.out.println("善后处理!");
    }
}

案列测试:

public class ProxyTest {
    public static void main(String[] args) {
        Service service = new UserServiceImpl();
        MyInvocatioHandler handler = new MyInvocatioHandler(service);
        Service serviceProxy = (Service)handler.getProxy();
        serviceProxy.process();
    }
}

测试结果:

预处理!
用户service处理
善后处理!

动态代理的优缺点: 
优点: 
接口中的所有方法都被转移到调用处理器一个集中的方法中在方法“运行时”动态的加入,决定你是什么类型,较灵活 
缺点: 
1. 与静态代理相比,效率降低了 
2. JDK动态代理只能对实现了接口的类进行代理

欢迎关注下面二维码进行技术交流: 

以上是关于JStorm与Storm源码分析--SpoutOutputCollector与代理模式的主要内容,如果未能解决你的问题,请参考以下文章

JStorm与Storm源码分析--BasicBoltExecutor与装饰模式

JStorm与Storm源码分析--SpoutOutputCollector与代理模式

Storm/JStorm之TopologyBuilder源码阅读

jstorm之于storm

JStorm Storm 上手demo

海数据技术沙龙——Flink:新一代流式计算框架&Storm/JStorm: 流式计算框架的应用