即时通信系统Openfire分析之六:路由表 RoutingTable
Posted Fordestiny
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了即时通信系统Openfire分析之六:路由表 RoutingTable相关的知识,希望对你有一定的参考价值。
还是从会话管理说起
上一章,Session经过预创建、认证之后,才正常可用。认证时,最重要的操作,就是将Session加入到路由表,使之拥用了通信功能。
添加到至路由表的操作,是在SessionManager中操作的,如下:
SessionManager.addSession(LocalClientSession session):
public void addSession(LocalClientSession session) { // Add session to the routing table (routing table will know session is not available yet) routingTable.addClientRoute(session.getAddress(), session); // Remove the pre-Authenticated session but remember to use the temporary ID as the key localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString()); SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ? SessionEventDispatcher.EventType.anonymous_session_created : SessionEventDispatcher.EventType.session_created; // Fire session created event. SessionEventDispatcher.dispatchEvent(session, event); if (ClusterManager.isClusteringStarted()) { // Track information about the session and share it with other cluster nodes sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session)); } }
进入路由表模块, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:
public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); localRoutingTable.addRoute(route.toString(), destination); ...... return added; }
从这里可以看出,路由表的底层,是借助LocalRoutingTable类来实现。
路由表的底层数据结构
LocalRoutingTable类的成员构成,非常的简单:
Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();
也就是说,路由表的实质,就是一个Map的数据结构,其Key为JID地址,Velue为RoutableChannelHandler类型报文处理器。
查看路由表RoutingTableImpl模块中的路由添加方法,可以看到表中存储的是以RoutableChannelHandler衍生出来的几个Session类型,总共提供了三种:
LocalOutgoingServerSession(用于存储连接本机的远程服务端)、LocalClientSession(用于存储连接到本机的客户端)、RoutableChannelHandler(用于存储组件),类结构如下:
|-- RoutableChannelHandler |-- Session |-- LocalSession |-- LocalClientSession |-- LocalServerSession |-- LocalOutgoingServerSession
而LocalRoutingTable内的所有方法,就是一系列对这个Map结构的操作函数,核心的如下几个:
添加路由:
boolean addRoute(String address, RoutableChannelHandler route) { return routes.put(address, route) != route; }
获取路由:
RoutableChannelHandler getRoute(String address) { return routes.get(address); }
获取客户端的Session列表:
Collection<LocalClientSession> getClientRoutes() { List<LocalClientSession> sessions = new ArrayList<>(); for (RoutableChannelHandler route : routes.values()) { if (route instanceof LocalClientSession) { sessions.add((LocalClientSession) route); } } return sessions; }
移除路由
void removeRoute(String address) { routes.remove(address); }
还有一个每3分钟一次的定时任务,查询并关闭被闲置了的远程服务器Session,在路由表中启动该任务
public void start() { int period = 3 * 60 * 1000; TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period); }
路由表模块 RoutingTable
路由表是Openfire的核心module之一,RoutingTable接口定义了一系列操作标准,主要围绕路由表进行,提供添加,删除,查询,消息路由等操作,而RoutingTableImpl负责具体实现。
先来看看RoutingTableImpl的成员列表
/** * 缓存外部远程服务器session * Key: server domain, Value: nodeID */ private Cache<String, byte[]> serversCache; /** * 缓存服务器的组件 * Key: component domain, Value: list of nodeIDs hosting the component */ private Cache<String, Set<NodeID>> componentsCache; /** * 缓存已认证的客户端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> usersCache; /** * 缓存已认证匿名的客户端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> anonymousUsersCache; /** * 缓存已认证(包括匿名)的客户端Resource,一个用户,在每一端登录,都会有一个resource * Key: bare JID, Value: list of full JIDs of the user */ private Cache<String, Collection<String>> usersSessions; private String serverName; // 服务器的域名 private XMPPServer server; // XMPP服务 private LocalRoutingTable localRoutingTable; // 路由表底层 private RemotePacketRouter remotePacketRouter; // 远程包路由器 private IQRouter iqRouter; // IQ包路由器 private MessageRouter messageRouter; // Message包路由器 private PresenceRouter presenceRouter; // Presence包路由器 private PresenceUpdateHandler presenceUpdateHandler; // 在线状态更新处理器
成员列表中,除了LocalRoutingTable之外,还定义了一堆的缓存。这些缓存干嘛用?
Openfire支持集群机制,即在多台服务器上分别运行一个Openfire实例,并使各个实例的数据同步。算法一致,数据一致,用户不管连接到任意一台服务器,效果就都一样。
集群中的数据同步,除了数据库之外,其他的都是用通过缓存来处理,而上面的这些缓存正是集群同步的一部分,用于同步用户路由信息,每个服务器都会有缓存的副本。
总的来说,LocalRoutingTable用于存储本机的路由数据,而Cache中是存储了整个集群的路由数据。
但是,需要注意的一点,LocalRoutingTable与Cache,这两者的数据结构并不相同:
(1)LocalRoutingTable中记录了本机中所有的Session实例,可以用来通信
(2)Cache中只存储了用户路由节点信息,需要通过集群管理组件来获取Session实例
路由表的操作
路由表的操作,实际上就是在会话管理中,对会话实例的操作。为免与上面混淆,这一节的功能说明,以会话代称。
添加路由(会话)
代码如下:
@Override public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); // 加入到路由表 localRoutingTable.addRoute(route.toString(), destination); // 若为匿名客户端,添加到anonymousUsersCache、usersSessions缓存队列中 if (destination.getAuthToken().isAnonymous()) { Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache); try { lockAn.lock(); added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockAn.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); usersSessions.put(route.toBareJID(), Arrays.asList(route.toString())); } finally { lock.unlock(); } } } // 非匿名客户端,添加到usersCache、usersSessions缓存队列中 else { Lock lockU = CacheFactory.getLock(route.toString(), usersCache); try { lockU.lock(); added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockU.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids == null) { // Optimization - use different class depending on current setup if (ClusterManager.isClusteringStarted()) { jids = new HashSet<>(); } else { jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); } } jids.add(route.toString()); usersSessions.put(route.toBareJID(), jids); } finally { lock.unlock(); } } } return added; }
主要两步:
(1)添加到路由表
(2)添加到对应的缓存中
移除路由(会话)
代码如下:
@Override public boolean removeClientRoute(JID route) { boolean anonymous = false; String address = route.toString(); ClientRoute clientRoute = null; // 从缓存中移除客户端的Session信息 Lock lockU = CacheFactory.getLock(address, usersCache); try { lockU.lock(); clientRoute = usersCache.remove(address); } finally { lockU.unlock(); } if (clientRoute == null) { Lock lockA = CacheFactory.getLock(address, anonymousUsersCache); try { lockA.lock(); clientRoute = anonymousUsersCache.remove(address); anonymous = true; } finally { lockA.unlock(); } } if (clientRoute != null && route.getResource() != null) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); if (anonymous) { usersSessions.remove(route.toBareJID()); } else { Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids != null) { jids.remove(route.toString()); if (!jids.isEmpty()) { usersSessions.put(route.toBareJID(), jids); } else { usersSessions.remove(route.toBareJID()); } } } } finally { lock.unlock(); } } // 将对应客户端的Session信息,移出路由表 localRoutingTable.removeRoute(address); return clientRoute != null; }
操作与添加类似:
(1)移除缓存里的路由信息
(2)移除路由表中的信息
获取路由(会话)
@Override public ClientSession getClientRoute(JID jid) { // Check if this session is hosted by this cluster node ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString()); if (session == null) { // The session is not in this JVM so assume remote RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Check if the session is hosted by other cluster node ClientRoute route = usersCache.get(jid.toString()); if (route == null) { route = anonymousUsersCache.get(jid.toString()); } if (route != null) { session = locator.getClientSession(route.getNodeID().toByteArray(), jid); } } } return session; }
从上面的方法代码中可以看到,获取路由的方法是:先查找本地路由表,若获取不到对应Session时,则通过集群获取。RemoteSessionLocator是用于适配不同的集群组件所抽象的接口,为不同集群组件提供了透明处理。
至于如何从集群中获取Session,主要就在于sersCache和anonymousUsersCache这两个cache,它们记录了每个客户端的路由节点信息,通过它可以取得对应的Session实例。
消息路由
根据发送的形式,分为两种:一是广播、二是单点路由
1、以广播的形式,向所有在线的客户端发送消息
@Override public void broadcastPacket(Message packet, boolean onlyLocal) { // Send the message to client sessions connected to this JVM for(ClientSession session : localRoutingTable.getClientRoutes()) { session.process(packet); } // Check if we need to broadcast the message to client sessions connected to remote cluter nodes if (!onlyLocal && remotePacketRouter != null) { remotePacketRouter.broadcastPacket(packet); } }
2、单点发送的形式,向某个指定的客户端发送消息
@Override public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException { boolean routed = false; try { if (serverName.equals(jid.getDomain())) { // Packet sent to our domain. routed = routeToLocalDomain(jid, packet, fromServer); Log.info("routeToLocalDomain"); } else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) { // Packet sent to component hosted in this server routed = routeToComponent(jid, packet, routed); Log.info("routeToComponent"); } else { // Packet sent to remote server routed = routeToRemoteDomain(jid, packet, routed); Log.info("routeToRemoteDomain"); } } catch (Exception ex) { // Catch here to ensure that all packets get handled, despite various processing // exceptions, rather than letting any fall through the cracks. For example, // an IAE could be thrown when running in a cluster if a remote member becomes // unavailable before the routing caches are updated to remove the defunct node. // We have also occasionally seen various flavors of NPE and other oddities, // typically due to unexpected environment or logic breakdowns. Log.error("Primary packet routing failed", ex); } if (!routed) { if (Log.isDebugEnabled()) { Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML()); } if (packet instanceof IQ) { iqRouter.routingFailed(jid, packet); } else if (packet instanceof Message) { messageRouter.routingFailed(jid, packet); } else if (packet instanceof Presence) { presenceRouter.routingFailed(jid, packet); } } }
路由表中的功能,最后由SessionManager集中处理,详见上一章的分析,这里不再赘述。
特点提一点,比较有用:路由表做为一个module已经在Openfire主服务启动时完成实例化,所以,在自定义的插件、或者其他任何需要发送消息的地方,只需选择调用如下两个方法中之一,即可完成消息发送:
XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);
XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);
而消息发送中,最后消息如何送到网卡实现发送,在第三章《消息路由》中已经详细分析,同样不再赘述。
本章就到此结束,OVER!
以上是关于即时通信系统Openfire分析之六:路由表 RoutingTable的主要内容,如果未能解决你的问题,请参考以下文章
基于openfire+smack即时通讯instant message开发
openfire服务器+Spark搭建即时聊天系统 & 初探阿里云