wamp是一个开放式的标准的websocket子协议,在一个统一协议中提供两种应用程序的消息模式: 远程过程调用 + 发布&订阅
因为应用程序通常对两种形式的通信都有自然的需要,并且不应该要求为这些形式使用不同的协议/手段, 所以这就是为什么WAMP提供这这两种通信模式。
发布&订阅 + 远程过程调用
统一路由可能最好通过与传统方法进行对比来解释。 在客户端 - 服务器模型中,远程过程调用直接从调用者到被调用者:
在客户端 - 服务器模型中,Caller需要知道Callee驻留在哪里以及如何到达它,这引入Caller和Callee之间的强耦合。 这是很麻烦的,因为应用程序可能很快变得复杂和无法维护。
在发布 - 订阅模型中,发布者向抽象的“主题”提交信息,并且订阅者仅通过它们对应“主题”的兴趣而间接地接收到信息。 两个人不知道彼此。 它们通过“主题”和通常称为代理的中间体解耦: 发布者-->代理商-->订阅者
代理商保留订阅:谁目前订阅了哪个主题。 当发布者向主题发布一些信息(“事件”)时,代理将查找当前对该主题订阅的用户:确定发布到主题上的订阅者的集合,然后将信息(“事件”)转发给所有这些订阅者。 而确定信息接收者(独立于提交的信息)和将信息转发到接收者的行为就被称为路由。
现在,WAMP将松耦合的好处转化为RPC。与客户端 - 服务器模型不同,WAMP还通过引入中介:经销商来解除Caller和Callees的联系: 调用者-->经销商-->被调者
类似于Broker在PubSub中的作用,经销商负责将来自Caller的呼叫路由到被叫方,并路由返回结果或者错误,反之亦然。 两者不知道彼此:对等体驻留在哪里以及如何到达它,这些内容被封装在经销商中。
使用WAMP,被叫方在经销商处以抽象名称注册过程:标识过程的URI。 当调用者想要调用远程过程时,它与经销商谈话,并且仅提供要调用的过程的URI加上任何调用参数。 经销商将在他的注册程序书中查找要援引的程序。 书中的信息包括执行程序的Callee所在的位置,以及如何到达它。
WAMP调用路由器:路由 = 代理商 + 经销商,路由器能够路由呼叫,因此可以支持使用RPC和PubSub的灵活的解耦架构。
这里是一个例子。 想象一下,你有一个像Arduino Yun这样的小型嵌入式设备,具有传感器(如温度传感器)和致动器(如灯或电机)连接。 您希望将设备集成到整个系统中,用户面向前端以控制执行器,并连续处理后端组件中的传感器值。
使用WAMP,可以有一个基于浏览器的UI及嵌入式设备和后端实时交谈,从基于浏览器的UI打开设备上的灯自然通过在设备(1)上调用远程过程来完成。 并且由设备连续生成的传感器值通过发布和订阅(2)自然地传输到后端组件(并且可能是其他组件)。
WAMP的设计有一流的支持,支持不同的语言。 WAMP中没有要求特定于单个编程语言。 只要编程语言具有WAMP实现,它就可以与使用WAMP支持的任何其他语言编写的应用程序组件实现透明通信。
调用者通过提供过程URI和调用的任何参数来向远程过程发出调用。 被调者将使用提供的调用参数执行该过程,并将调用结果返回给调用者。
订阅者订阅他们对Brokers感兴趣的主题,发布者首先在Brokers发布。 代理将发布者传入的事件路由到订阅相应主题的订阅者。
package ws.wamp.jawampa; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import rx.schedulers.Schedulers; import ws.wamp.jawampa.ApplicationError; import ws.wamp.jawampa.Request; import ws.wamp.jawampa.WampError; import ws.wamp.jawampa.WampClient; import ws.wamp.jawampa.WampClientBuilder; public class Publish { String url; String realm; int flag = 0; WampClient client; Subscription addProcSubscription; Subscription counterPublication; Subscription onHelloSubscription; // Scheduler for this example ExecutorService executor = Executors.newSingleThreadExecutor(); Scheduler rxScheduler = Schedulers.from(executor); static final int TIMER_INTERVAL = 1000; // 1s int counter = 0; Publish(String url, String realm) throws Exception { this.url = url; this.realm = realm; } public static void main(String[] args) throws Exception { if (args.length != 2) { System.out.println("Need 2 commandline arguments: Router URL and ream name"); return; } String args1 ="ws://"; String args2 = "realm1"; new Publish(args1, args2).run(); } void run() { WampClientBuilder builder = new WampClientBuilder(); try { builder.witUri(url) .withRealm(realm) .withInfiniteReconnects() .withCloseOnErrors(true) .withReconnectInterval(5, TimeUnit.SECONDS); client = builder.build(); } catch (WampError e) { e.printStackTrace(); return; } // Subscribe on the clients status updates client.statusChanged() .observeOn(rxScheduler) .subscribe(new Action1<WampClient.Status>() { @Override public void call(WampClient.Status t1) { System.out.println("Session status changed to " + t1); if (t1 == WampClient.Status.Connected) { // PUBLISH and CALL every second .. forever counter = 0; final int published = 4; onHelloSubscription= client.publish("glf", published) .observeOn(rxScheduler) .subscribe(new Action1<Long>() { @Override public void call(Long t1) { System.out.println("published to ‘oncounter‘ with counter " + published); } }, new Action1<Throwable>() { @Override public void call(Throwable e) { System.out.println("Error during publishing to ‘oncounter‘: " + e); } }); CALL a remote procedure flag =1; System.out.println(flag); counter++; } else if (t1 == WampClient.Status.Disconnected) { closeSubscriptions(); } } }, new Action1<Throwable>() { @Override public void call(Throwable t) { System.out.println("Session ended with error " + t); } }, new Action0() { @Override public void call() { System.out.println("Session ended normally"); } }); client.open(); waitUntilKeypressed(); while (flag == 0){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("waiting"); } System.out.println("Shutting down"); closeSubscriptions(); client.close(); try { client.getTerminationFuture().get(); } catch (Exception e) {} executor.shutdown(); } void closeSubscriptions() { if (onHelloSubscription != null) onHelloSubscription.unsubscribe(); onHelloSubscription = null; if (counterPublication != null) counterPublication.unsubscribe(); counterPublication = null; if (addProcSubscription != null) addProcSubscription.unsubscribe(); addProcSubscription = null; } private void waitUntilKeypressed() { try { System.in.read(); while (System.in.available() > 0) { System.in.read(); } } catch (IOException e) { e.printStackTrace(); } } }
package ws.wamp.jawampa; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import rx.Scheduler; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; import rx.schedulers.Schedulers; import ws.wamp.jawampa.WampError; import ws.wamp.jawampa.WampClient; import ws.wamp.jawampa.WampClientBuilder; public class Subscribe { String url; String realm; WampClient client; Subscription glfSubscription; ExecutorService executor = Executors.newSingleThreadExecutor(); Scheduler rxScheduler = Schedulers.from(executor); static final int TIMER_INTERVAL = 1000; // 1s int counter = 0; Subscribe(String url, String realm) throws Exception { this.url = url; this.realm = realm; } public static void main(String[] args) throws Exception { String args1 = "ws://"; String args2 = "realm1"; new Subscribe(args1, args2).run(); } void run() { WampClientBuilder builder = new WampClientBuilder(); try { builder.witUri(url).withRealm(realm).withInfiniteReconnects().withCloseOnErrors(true) .withReconnectInterval(5, TimeUnit.SECONDS); client = builder.build(); } catch (WampError e) { e.printStackTrace(); return; } // Subscribe on the clients status updates client.statusChanged().observeOn(rxScheduler).subscribe(new Action1<WampClient.Status>() { @Override public void call(WampClient.Status t1) { System.out.println("工作状态变为 " + t1); if (t1 == WampClient.Status.Connected) { // SUBSCRIBE to a topic and receive events glfSubscription = client.makeSubscription("glf", String.class).observeOn(rxScheduler) .subscribe(new Action1<String>() { @Override public void call(String msg) { System.out.println("主题 ‘glf‘ 接收: " + msg); } }, new Action1<Throwable>() { @Override public void call(Throwable e) { System.out.println("failed to subscribe ‘websocket‘: " + e); } }, new Action0() { @Override public void call() { System.out.println("‘websocket‘ subscription ended"); } }); } else if (t1 == WampClient.Status.Disconnected) { closeSubscriptions(); } } }, new Action1<Throwable>() { @Override public void call(Throwable t) { System.out.println("Session ended with error " + t); } }, new Action0() { @Override public void call() { System.out.println("工作正常结束"); } }); client.open(); waitUntilKeypressed(); System.out.println("关闭"); closeSubscriptions(); client.close(); try { client.getTerminationFuture().get(); } catch (Exception e) { } executor.shutdown(); } void closeSubscriptions() { if (glfSubscription != null) glfSubscription.unsubscribe(); glfSubscription = null; } private void waitUntilKeypressed() { try { System.in.read(); while (System.in.available() > 0) { System.in.read(); } } catch (IOException e) { e.printStackTrace(); } } }
WebSocket是一种新的Web协议,当需要双向,实时通信时,克服HTTP的限制。 WebSocket将是WAMP的理想基础,因为它提供了与Web和浏览器兼容的双向实时消息传递。 不仅如此 - 我们可以在非浏览器环境中运行WebSocket。 从技术上讲,WAMP是一个正式注册的WebSocket子协议(在WebSocket之上运行),它使用JSON作为消息序列化格式。