WAMP-网络程序消息协议

Posted glfcdio

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WAMP-网络程序消息协议相关的知识,希望对你有一定的参考价值。

WAMP-网络程序消息协议

wamp是一个开放式的标准的websocket子协议,在一个统一协议中提供两种应用程序的消息模式: 远程过程调用  +  发布&订阅

它在使用不同语言的开放的WebSocket协议中提供统一应用程序路由。使用WAMP,可以用松耦合实现实时通信的应用程序组件构建分布式系统。

因为应用程序通常对两种形式的通信都有自然的需要,并且不应该要求为这些形式使用不同的协议/手段, 所以这就是为什么WAMP提供这这两种通信模式。

WAMP的核心是为应用程序组件提供两种通信模式,以便彼此通信:

         发布&订阅  + 远程过程调用

一、统一应用路由

WAMP提供给了我们所谓的应用程序的统一应用路由:在一个协议中的应用组件之间路由两个事件(用于PubSub)和路由呼叫(用于RPC)。

技术分享图片

统一路由可能最好通过与传统方法进行对比来解释。 在客户端 - 服务器模型中,远程过程调用直接从调用者到被调用者:

  调用者-->被调者

在客户端 - 服务器模型中,Caller需要知道Callee驻留在哪里以及如何到达它,这引入Caller和Callee之间的强耦合。 这是很麻烦的,因为应用程序可能很快变得复杂和无法维护。

所以WAMP如何解决这一点的呢?

技术分享图片

在发布 - 订阅模型中,发布者向抽象的“主题”提交信息,并且订阅者仅通过它们对应“主题”的兴趣而间接地接收到信息。 两个人不知道彼此。 它们通过“主题”和通常称为代理的中间体解耦: 发布者-->代理商-->订阅者

代理商保留订阅:谁目前订阅了哪个主题。 当发布者向主题发布一些信息(“事件”)时,代理将查找当前对该主题订阅的用户:确定发布到主题上的订阅者的集合,然后将信息(“事件”)转发给所有这些订阅者。 而确定信息接收者(独立于提交的信息)和将信息转发到接收者的行为就被称为路由。

技术分享图片

现在,WAMP将松耦合的好处转化为RPC。与客户端 - 服务器模型不同,WAMP还通过引入中介:经销商来解除Caller和Callees的联系: 调用者-->经销商-->被调者

类似于Broker在PubSub中的作用,经销商负责将来自Caller的呼叫路由到被叫方,并路由返回结果或者错误,反之亦然。 两者不知道彼此:对等体驻留在哪里以及如何到达它,这些内容被封装在经销商中。

使用WAMP,被叫方在经销商处以抽象名称注册过程:标识过程的URI。 当调用者想要调用远程过程时,它与经销商谈话,并且仅提供要调用的过程的URI加上任何调用参数。 经销商将在他的注册程序书中查找要援引的程序。 书中的信息包括执行程序的Callee所在的位置,以及如何到达它。

实际上,Caller和Callees是分离的,应用程序可以使用RPC并仍然受益于松耦合。

WAMP调用路由器:路由 = 代理商 + 经销商,路由器能够路由呼叫,因此可以支持使用RPC和PubSub的灵活的解耦架构。

这里是一个例子。 想象一下,你有一个像Arduino Yun这样的小型嵌入式设备,具有传感器(如温度传感器)和致动器(如灯或电机)连接。 您希望将设备集成到整个系统中,用户面向前端以控制执行器,并连续处理后端组件中的传感器值。

技术分享图片

 使用WAMP,可以有一个基于浏览器的UI及嵌入式设备和后端实时交谈,从基于浏览器的UI打开设备上的灯自然通过在设备(1)上调用远程过程来完成。 并且由设备连续生成的传感器值通过发布和订阅(2)自然地传输到后端组件(并且可能是其他组件)。

二、多语言化

WAMP的设计有一流的支持,支持不同的语言。 WAMP中没有要求特定于单个编程语言。 只要编程语言具有WAMP实现,它就可以与使用WAMP支持的任何其他语言编写的应用程序组件实现透明通信。

三、什么是RPC?什么是发布-订阅?

远程过程调用(RPC)是一种涉及三个角色的同级的消息模式:

呼叫者、被叫者、经销商

调用者通过提供过程URI和调用的任何参数来向远程过程发出调用。 被调者将使用提供的调用参数执行该过程,并将调用结果返回给调用者。

调用者注册它们向经销商提供的程序,呼叫者首先向经销商发起程序呼叫,经销商将来自呼叫者的呼叫路由实现到被调者,并将呼叫结果从被调者路由返回到呼叫者。

Caller和Callee通常运行应用程序代码,而经销商作为一个通用路由器,用于远程过程调用解耦Caller和Callees。

发布和订阅(PubSub)是一种包含三个角色的同行的消息模式:

发布者、订阅者、经纪人

发布者通过提供主题URI和事件的任何有效内容将主题发布,主题的订阅者将事件内容接收。

订阅者订阅他们对Brokers感兴趣的主题,发布者首先在Brokers发布。 代理将发布者传入的事件路由到订阅相应主题的订阅者。

发布者和订阅者通常运行应用程序代码,而代理作为通用路由器用于将发布者与订阅者分离。

下面是一个简单的WAMP应用小程序,搭建一个CrossBar服务器,开启服务器,就可以运行wamp小程序了。

发布者(Publish):

技术分享图片
package ws.wamp.jawampa;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import ws.wamp.jawampa.ApplicationError;
import ws.wamp.jawampa.Request;
import ws.wamp.jawampa.WampError;
import ws.wamp.jawampa.WampClient;
import ws.wamp.jawampa.WampClientBuilder;

public class Publish {
    
    String url;
    String realm;
    int flag = 0;
    WampClient client;
    
    Subscription addProcSubscription;
    Subscription counterPublication;
    Subscription onHelloSubscription;
    
    // Scheduler for this example
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Scheduler rxScheduler = Schedulers.from(executor);
    
    static final int TIMER_INTERVAL = 1000; // 1s
    int counter = 0;
    
    Publish(String url, String realm) throws Exception {
        this.url = url;
        this.realm = realm;
    }
    
    
    public static void main(String[] args) throws Exception {
      if (args.length != 2) {
          System.out.println("Need 2 commandline arguments: Router URL and ream name");
          return;
      }
      String args1 ="ws://172.16.100.55:8080/ws";
      String args2 = "realm1";
      new Publish(args1, args2).run();
  }
  
  void run() {
      
      WampClientBuilder builder = new WampClientBuilder();
      try {
          builder.witUri(url)
                 .withRealm(realm)
                 .withInfiniteReconnects()
                 .withCloseOnErrors(true)
                 .withReconnectInterval(5, TimeUnit.SECONDS);
          client = builder.build();
      } catch (WampError e) {
          e.printStackTrace();
          return;
      }

      // Subscribe on the clients status updates
      client.statusChanged()
            .observeOn(rxScheduler)
            .subscribe(new Action1<WampClient.Status>() {
          @Override
          public void call(WampClient.Status t1) {
              System.out.println("Session status changed to " + t1);

              if (t1 == WampClient.Status.Connected) {
                  
               
                  // PUBLISH and CALL every second .. forever
                  counter = 0;
                  final int published = 4;
                  onHelloSubscription= client.publish("glf", published)
                                .observeOn(rxScheduler)
                                .subscribe(new Action1<Long>() {
                                  @Override
                                  public void call(Long t1) {
                                      System.out.println("published to ‘oncounter‘ with counter " + published);
                                  }
                              }, new Action1<Throwable>() {
                                  @Override
                                  public void call(Throwable e) {
                                      System.out.println("Error during publishing to ‘oncounter‘: " + e);
                                  }
                              });
                          
                           CALL a remote procedure
                            
                              flag =1;
                               System.out.println(flag);
                          counter++;
                     
              }
              else if (t1 == WampClient.Status.Disconnected) {
                  closeSubscriptions();
              }
          }
      }, new Action1<Throwable>() {
          @Override
          public void call(Throwable t) {
              System.out.println("Session ended with error " + t);
          }
      }, new Action0() {
          @Override
          public void call() {
              System.out.println("Session ended normally");
          }
      });

      client.open();
      
      waitUntilKeypressed();
      while (flag == 0){
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              
              e.printStackTrace();
          }
          
          System.out.println("waiting");
          }
      System.out.println("Shutting down");
      closeSubscriptions();
      client.close();
      try {
          client.getTerminationFuture().get();
      } catch (Exception e) {}
      
      executor.shutdown();
  }
  
  void closeSubscriptions() {
      if (onHelloSubscription != null)
          onHelloSubscription.unsubscribe();
      onHelloSubscription = null;
      if (counterPublication != null)
          counterPublication.unsubscribe();
      counterPublication = null;
      if (addProcSubscription != null)
          addProcSubscription.unsubscribe();
      addProcSubscription = null;
  }
  
  private void waitUntilKeypressed() {
      try {
          System.in.read();
          while (System.in.available() > 0) {
              System.in.read();
          }
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

}
View Code

订阅者(Subscribe):

package ws.wamp.jawampa;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import ws.wamp.jawampa.WampError;
import ws.wamp.jawampa.WampClient;
import ws.wamp.jawampa.WampClientBuilder;

public class Subscribe {

    String url;
    String realm;

    WampClient client;

    Subscription glfSubscription;

    ExecutorService executor = Executors.newSingleThreadExecutor();
    Scheduler rxScheduler = Schedulers.from(executor);

    static final int TIMER_INTERVAL = 1000; // 1s
    int counter = 0;

    Subscribe(String url, String realm) throws Exception {
        this.url = url;
        this.realm = realm;
    }

    
    public static void main(String[] args) throws Exception {

        String args1 = "ws://172.16.100.55:8080/ws";
        String args2 = "realm1";
        new Subscribe(args1, args2).run();
    }

    void run() {

        WampClientBuilder builder = new WampClientBuilder();
        try {
            builder.witUri(url).withRealm(realm).withInfiniteReconnects().withCloseOnErrors(true)
                    .withReconnectInterval(5, TimeUnit.SECONDS);
            client = builder.build();
        } catch (WampError e) {
            e.printStackTrace();
            return;
        }

        // Subscribe on the clients status updates
        client.statusChanged().observeOn(rxScheduler).subscribe(new Action1<WampClient.Status>() {
            @Override
            public void call(WampClient.Status t1) {
                System.out.println("工作状态变为 " + t1);

                if (t1 == WampClient.Status.Connected) {
                    // SUBSCRIBE to a topic and receive events
                    glfSubscription = client.makeSubscription("glf", String.class).observeOn(rxScheduler)
                            .subscribe(new Action1<String>() {
                                 @Override
                                public void call(String msg) {
                                    System.out.println("主题 ‘glf‘ 接收: " + msg);
                                }
                            }, new Action1<Throwable>() {
                                 @Override
                                public void call(Throwable e) {
                                    System.out.println("failed to subscribe ‘websocket‘: " + e);
                                }
                            }, new Action0() {
                                 @Override
                                public void call() {
                                    System.out.println("‘websocket‘ subscription ended");
                                }
                            });

                } else if (t1 == WampClient.Status.Disconnected) {
                    closeSubscriptions();
                }
            }
        }, new Action1<Throwable>() {
             @Override
            public void call(Throwable t) {
                System.out.println("Session ended with error " + t);
            }
        }, new Action0() {
             @Override
            public void call() {
                System.out.println("工作正常结束");
            }
        });

        client.open();

        waitUntilKeypressed();
        System.out.println("关闭");
        closeSubscriptions();
        client.close();
        try {
            client.getTerminationFuture().get();
        } catch (Exception e) {
        }

        executor.shutdown();
    }
    void closeSubscriptions() {
        if (glfSubscription != null)
            glfSubscription.unsubscribe();
        glfSubscription = null;

    }

    private void waitUntilKeypressed() {
        try {
            System.in.read();
            while (System.in.available() > 0) {
                System.in.read();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

四、什么是WebSocket协议

WebSocket是一种新的Web协议,当需要双向,实时通信时,克服HTTP的限制。 WebSocket将是WAMP的理想基础,因为它提供了与Web和浏览器兼容的双向实时消息传递。 不仅如此 - 我们可以在非浏览器环境中运行WebSocket。 从技术上讲,WAMP是一个正式注册的WebSocket子协议(在WebSocket之上运行),它使用JSON作为消息序列化格式。

以上是关于WAMP-网络程序消息协议的主要内容,如果未能解决你的问题,请参考以下文章

Jawampa--将WAMP支持到Java的库

高速公路 WAMP 服务器的命令式客户端?

CPNtools协议建模安全分析---实例变迁标记

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

wamp,phpserver,xampp

计算机网络-----ICMP协议和PING程序