jawampa
是一个将Web应用程序消息传递协议[WAMP]支持到Java的库。
提供WAMPv2客户端功能以及服务器端功能,并支持所有当前定义的WAMPv2角色(呼叫者,被叫者,发布者,订户,代理,经销商)。
提供可插拔传输层。使用不同网络机制和低级库的连接者和服务器可以被构建并插入到jawampa中。
通过RxJava Observables显示客户端用户界面,可以实现不同异步操作的强大组合,并为不同线程之间的委托数据处理提供一个简单的解决方案。
声明以下基础库的依赖关系 :
<dependency>
<groupId>ws.wamp.jawampa</groupId>
<artifactId>jawampa-core</artifactId>
<version>0.4.2</version>
</dependency>
然而,由于jawampa的核心库不提供传输层,用户通常应该使用jawampa传输提供程序库(例如,jawampa-netty - 见子目录)作为代理。这将自动也添加一个依赖于jawampa-core。
WAMP client API (WampClient)
WampClients必须通过WampClientBuilder对象创建。必须通过构建器设置3个必需参数:
连接器提供程序,描述将用于建立到WAMP路由器的连接的框架。
描述WAMP路由器地址的URI
客户端应该在路由器上加入的领域
另外,存在一些可选参数,其例如允许激活客户端和路由器之间的自动重新连接,或者允许配置在通信错误的情况下客户端应当如何行为。
Example:
final WampClient client; try { WampClientBuilder builder = new WampClientBuilder(); builder.withConnectorProvider(connectorProvider) .withUri("ws://localhost:8080/wamprouter") .withRealm("examplerealm") .withInfiniteReconnects() .withReconnectInterval(5, TimeUnit.SECONDS); client = builder.build(); } catch (WampError e) { System.out.println(e); return; }
WampClient对象提供RxJava Observable statusChanged(),通知用户客户端和路由器之间会话的当前状态,可以是DisconnectedState,ConnectingState或ConnectedState。 应用程序可以监视此Observable以检测何时应执行其他步骤(例如,在连接后订阅主题或注册函数)。
statusChanged()返回BehaviorObservable,因此它将立即向订阅者发送关于当前状态的通知,而不仅仅是在状态改变的情况下。
statusChanged()返回一个BehaviorObservable,因此它将立即发送一个通知给订阅者和用户的当前状改变。
Example:
client.statusChanged() .observeOn(applicationScheduler) .subscribe((WampClient.State newState) -> { if (newState instanceof WampClient.ConnectedState) { } else if (newState instanceof WampClient.DisconnectedState) { } else if (newState instanceof WampClient.ConnectingState) { }});
为了启动客户端和路由器之间的连接,客户端open()成员函数必须被调用。 这将导致第一次连接尝试和状态从DisconnectedState更改为ConnectingState。
当客户端不再需要时,必须使用close()成员函数关闭。 这将关闭到远程路由器的连接,并停止所有重新连接尝试。WampClient关闭后,无法重新打开。如果需要,应创建WampClient的新实例。
关闭过程也是异步的。因此,调用close()不能保证客户端立即关闭。 然而,close()调用返回一个Observable,它可以用来等待客户端被成功关闭。
典型会话生命周期的示例:
WampClient client = builder.build(); client.statusChanged().subscribe(...); client.open(); // ... // use the client here // ... // Wait synchronously for the client to close // On environments like UI thread asynchronous waiting should be preferred client.close().toBlocking().last();
执行过程调用
可以通过WampClient的各种调用成员函数执行远程过程调用。
所有重载版本的调用都需要调用过程的名称(并且它必须是有效的WAMP Uri)作为第一个参数。所有版本的call()返回一个Observable,它用于以异步方式将函数调用的结果传递给调用者。它是一个活跃的observable,这意味着调用将独立地判断出是否有人订阅它。然而结果将被缓存在Observable中,这意味着较晚的订阅者将能够检索结果。
如果过程调用成功,则将使用结果来调用订阅者onNext方法,然后执行onCompleted调用。如果远程过程调用失败,则会调用订阅方onError()方法,并将出现的错误作为参数。
call()的不同重载允许以不同的方式向过程提供参数,以及以不同的方式检索返回值:
调用的最明确的签名是 可观察<Reply>调用(String procedure,ArrayNode arguments,ObjectNode argumentsKw) 它允许将位置参数和关键字参数传递到WAMP过程,并且将返回一个结构,该结构还包含调用结果的位置和关键字参数的字段。 参数和返回值使用来自Jackson JSON库的ArrayNode和ObjectNode数据类型,描述任意其他类型的数组或对象。
如果调用简化变量只需要位置参数 可观察<Reply> call(String procedure,Object ... args) 可以使用它允许将位置参数作为varargs数组传递。它还将自动使用Jacksons对象映射功能来转换其JsonNode形式的所有Java POJO关键字参数,并从中创建参数数组。这意味着可以直接使用任何类型的Java对象作为函数参数,只要他们可以正确地序列化和反序列化。对于更复杂的数据结构,可能需要使用注释来指示序列化器。
call()的最后一个变体提供了一些进一步的便利,并且具有以下签名:<T>可观察<T>调用(String procedure,Class <T> returnValueClass,Object ... args)。 当过程不提供或只提供一个位置返回值时,可以使用它。然后可以在第二个参数中指定期望的返回值的类型,并且调用将自动尝试将过程调用的第一个结果参数映射到所需类型。这也将通过Jackson对象映射完成。通过这个简化,可以调用远程过程并以下列方式监听返回值(使用Java8):
Observable<String> result = client.call("myservice.concat", String.class, "Hello nr ", 123); // Subscribe for the result // onNext will be called in case of success with a String value // onError will be called in case of errors with a Throwable result.observeOn(applicationScheduler) .subscribe((txt) -> System.out.println(txt), (err) -> System.err.println(err));
向其他客户提供远程过程
使用WAMP,连接到路由器的所有客户端都能够提供可由任何其他客户端使用的过程。
jawampa通过registerProcedure()成员函数公开了这个功能。 registerProcedure将返回一个Observable,它将用于接收传入的函数调用。每个到注册过程名的入局请求将以Request类的形式推送到Subscribers onNext方法。应用程序可以通过observeOn检索和处理任何线程上的请求,并可以使用Request类成员函数发送对请求的响应。该过程将仅在订阅被调用之后在路由器处注册,并且如果订阅被取消订阅,则将在路由器处被取消注册。
如果在路由器的过程注册期间发生错误,将调用Subscribers onError方法来通知有关错误。如果会话断开连接(或断开连接),则订阅将简单地使用onCompleted完成。
提供回调第一个整数参数的过程的示例:
// Provide a procedure Observable proc = client.registerProcedure("echo.first").subscribe( request -> { if (request.arguments() == null || request.arguments().size() != 1 || !request.arguments().get(0).canConvertToLong()) { try { request.replyError(new ApplicationError(ApplicationError.INVALID_PARAMETER)); } catch (ApplicationError e) { } } else { long a = request.arguments().get(0).asLong(); request.reply(a); } }, e -> System.err.println(e)); // Unregister the procedure proc.unsubscribe();
发布事件
WampClient的publish()函数可用于向路由器发布具有主题(必须是有效的WAMP Uri)的事件,从而向所有其他连接的WAMP客户端发布事件。
与call()类似,publish()函数提供了各种重载,允许使用不同的格式传递事件参数。
publish()函数返回一个Observable <Long>。 就像从call()返回的Observable,这也是一个活跃的observable,不需要订阅来执行发布。 订阅了Observable的订阅者将收到一个onNext()调用,该调用在发布失败时传递publicationId或onError()调用。 这可以是例如当没有到路由器的连接时的情况。
发布事件的示例:
client.publish("example.event", "Hello ", "from nr ", 28) .subscribe( publicationId -> { /* Event was published */ }, err -> { /* Error during publication */});
订阅事件
要订阅从路由器上的其他客户端发布的事件,可以使用WampClient的makeSubscription()函数。 makeSubscription需要客户端感兴趣的主题,并返回一个可用于执行订阅的Observable。在路由器上的主题订阅只会发生在subscribe()被调用的Observable。之后,对于每个接收的事件,将调用Subscriber的onNext()函数,以包含位置和关键字参数的PubSubData结构的形式传递事件。如果由于错误onError()不能在路由器执行订阅将被调用来传递该错误。如果连接关闭或get已关闭,订阅将使用onCompleted()完成。
提示:对于大多数应用程序,在statusChanged()处理程序中连接到路由器之后执行预订是有意义的。
makeSubscription函数的重载,它可以在客户端仅对事件的第一位置参数感兴趣的情况下使用。它允许指定事件数据的类类型,并将自动尝试通过Jacksons对象映射功能将接收的事件转换为此数据类型。如果客户端没有参数Void.class可以使用一个Observable <Void>,当没有接收到参数的事件时,它会通知用户。如果接收到的事件数据无法转换为所需格式,订阅将被取消,并且将通过onError()函数传递错误。
订阅事件的示例:
// Subscribe to an event Observable<String> eventSubscription = client.makeSubscription("example.event", String.class) .subscribe((s) -> { /* String event received */ }, (e) -> { /* Error during subscription or object mapping */ }); // Unsubscribe from the event eventSubscription.unsubscribe();
WAMP服务器API(WampRouter)
jawampa提供了一个WAMP路由器,可以捆绑到应用程序,以避免安装,配置和运行外部路由器。 通过在应用程序中实例化提供API以及WampRouter的WampClient,可以模拟经典的服务器架构,其中服务器侦听连接以及提供API。
WampRouter类实现WAMPv2基本配置文件中描述的整个路由和领域逻辑。 它只能通过WampRouterBuilder类创建,这允许在实例化之前配置路由器。 在当前版本的jawampa中,路由器必须公开的领域必须通过它来配置。
配置和实例化路由器示例:
WampRouterBuilder routerBuilder = new WampRouterBuilder(); WampRouter router; try { routerBuilder.addRealm("realm1"); routerBuilder.addRealm("realm2"); router = routerBuilder.build(); } catch (ApplicationError e) { e.printStackTrace(); return; }
路由器将在构建之后直接运行。但是它不会监听任何连接,因此到目前为止不会做任何事情。如果让路由器工作,服务器必须设置为接受连接,在WampRouter上注册它们,然后向它推送消息。
WampRouter提供了一个IWampConnectionAcceptor接口的实现,它可以用来在客户端注册一个新的连接。它可以通过WampRouter.connectionAcceptor()getter查询。
在路由器上注册新连接是一个需要经过两个阶段的过程:
首先,连接提供程序必须通过调用connectionAcceptor.createNewConnectionListener();从路由器查询IWampConnectionListener的实例。这将是新连接在完全建立和注册后推送消息的接口。返回的接口尚未占用路由器中的任何非垃圾收集资源。因此,如果连接提供程序确定无法正确建立连接,则忽略返回值没有害处。
第二步,新连接必须通过调用connectionAcceptor.acceptNewConnection(connection,connectionListener)在路由器上注册;从而向路由器提供IWampConnection类型的发送接口。
一旦两个步骤完成,新连接可以仅向侦听器发送消息。较早发送邮件会导致未定义的行为。
提供IWampConnection WampRouter将使用接口来通过连接发送消息。必须保证以下的连接路由器:
路由器必须能够调用接口上的方法,只要它没有调用close(...)就可以了。如果连接已经关闭或处于错误状态,接口的实现应通过拒绝所提供的promise来应答sendMessage(...)调用。 路由器将始终在接口上调用close(...),即使连接被远程端关闭。 连接必须保证在它通过满足所提供的未来确认关闭调用后,不会在检索到的IWampConnectionListener接口上调用方法。路由器将接受close(...)调用的确认,作为连接所拥有的所有资源都已释放的标志。可以在jawampa-netty子项目中找到向基于Netty框架的路由器推送消息的服务器的示例实现。
关闭路由器
要关闭路由器,必须调用close()成员函数:
router.close.toBlocking().last();
这会正常关闭路由器和客户端之间建立的所有WAMP会话,并将关闭底层传输通道。如果在close()之后对路由器进行新连接,它将通过关闭它们来拒绝这些连接。
就像WampClient上的close()调用关闭一个WampRouter也是一个不同步的过程,当路由器完全关闭时,调用会返回一个Observable信号。
为了允许路由器在端口上监听,接受传入连接,必须启动一个或多个服务器,这些服务器使用路由器作为其最终请求处理程序。
jawampa是非常活跃的,在工作进行状态。 因此,适用以下限制:
不能正确支持WAMP规范中所要求的二进制值的传输。 jawampa将使用Jackson将数据从二进制转换为JSON,这将使用base64编码,但不会预置带有前导0字节的数据。
jawampa仅支持WAMPv2基本配置文件和高级配置文件的某些选定部分。许多高级配置文件功能未实现。
jawampa只支持WAMP客户端和路由器之间的websocket连接。
客户端和路由器的角色被正确传输,但不会考虑所有其他操作。例如。将不会验证远程对等体是否实际提供所需的功能。假设所有对等体都实现了适用于它们的所有角色。