外网服务端保存内网服务端会话的有效性以及平台上监控所有内网服务端的网络状况,模仿心跳机制实现,这里在做一点叙诉,关于思路和具体实现。
在很多的平台应用中,都有这样的需求,平台内包括多个子系统或者属于其管控范围内的其他平台,需要对这些系统进行统一的监控,来查看当前的运行状态或者其他运行信息,我们的应用也有这样的一个情况,需要再外网服务端(平台)上监控,其下运行的多个内网服务端的网络状况,查阅了写资料后确立了2种可实现的方式。
1:轮询机制
轮询:概括来说是服务端定时主动的去与要监控状态的客户端(或者叫其他系统)通信,询问当前的某种状态,客户端返回状态信息,客户端没有返回或返回错误、失效信息、则认为客户端已经宕机,然后服务端自己内部把这个客户端的状态保存下来(宕机或者其他),如果客户端正常,那么返回正常状态,如果客户端宕机或者返回的是定义的失效状态那么当前的客户端状态是能够及时的监控到的,如果客户端宕机之后重启了那么当服务端定时来轮询的时候,还是可以正常的获取返回信息,把其状态重新更新。
在很多的平台应用中,都有这样的需求,平台内包括多个子系统或者属于其管控范围内的其他平台,需要对这些系统进行统一的监控,来查看当前的运行状态或者其他运行信息,我们的应用也有这样的一个情况,需要再外网服务端(平台)上监控,其下运行的多个内网服务端的网络状况,查阅了写资料后确立了2种可实现的方式。
1:轮询机制
2:心跳机制
轮询:概括来说是服务端定时主动的去与要监控状态的客户端(或者叫其他系统)通信,询问当前的某种状态,客户端返回状态信息,客户端没有返回或返回错误、失效信息、则认为客户端已经宕机,然后服务端自己内部把这个客户端的状态保存下来(宕机或者其他),如果客户端正常,那么返回正常状态,如果客户端宕机或者返回的是定义的失效状态那么当前的客户端状态是能够及时的监控到的,如果客户端宕机之后重启了那么当服务端定时来轮询的时候,还是可以正常的获取返回信息,把其状态重新更新。
心跳:最终得到的结果是与轮询一样的但是实现的方式有差别,心跳不是服务端主动去发信息检测客户端状态,而是在服务端保存下来所有客户端的状态信息,然后等待客户端定时来访问服务端,更新自己的当前状态,如果客户端超过指定的时间没有来更新状态,则认为客户端已经宕机或者其状态异常。
心跳机制与轮询的比较,在我们的应用中,采用的是心跳,这样一是避免服务端的压力,二是灵活好控制,上一篇文章中提到过,我们的外网服务端(服务端)不知道内网服务端(客户端)的地址,有虽然有保存客户端的socket会话,但是客户端宕机会话就失效了。所以只能等着他主动来报告状态。在来说一下实现方式,这个很简单,就是一个思路问题。
首先,客户端(内网服务端)启动后,带着自己的标识符与服务端建立socket连接,服务端缓存下来对应信息(上一篇文章中已经实现过了),然后在通过socket流,定时发送当前信息消息到服务端(外网服务器端)某个接口,服务端收到后更新当前的客户端的状态,比如(会话地址,标识符,网络的活跃状态,连接时间,心跳时间),本次来更新的时间就是心跳时间,然后服务端还有一个定时器,定时检查所有缓存的客户端会话集合,将其中心跳时间与当前时间进行对比,如果超过指定的时间还没有来更新则认为该客户端的网络出现异常或者宕机,然后更新该客户端的网络状态。- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import org.dom4j.Document;
- import org.dom4j.DocumentException;
- import org.dom4j.DocumentHelper;
- import cn.edu.zju.cst.mina.im.server.entity.User;
- import cn.edu.zju.cst.mina.im.server.handler.ServerControler;
- public class UserStateManage extends Thread {
- //在线用户状态列表
- static HashMap<Integer, UserState> userStateList = new HashMap<Integer, UserState>();
- Object hashLock = new Object();
- //当前的连接数和工作线程数
- static int workThreadNum = 0;
- static int socketConnect = 0;
- private ServerSocket serverSocket;
- //服务器IP
- private String host = "10.82.81.79";
- //服务器端口
- private int stateReportPort = 60001;
- //设置心跳包的结束标记
- String endFlag = "</protocol>";
- CharSequence csEndFlag = endFlag.subSequence(0, 10);
- //扫描间隔
- private int scanTime = 1800;
- @Override
- public void run() {
- //绑定端口,并开始侦听用户的心跳包
- serverSocket = startListenUserReport(stateReportPort);
- if(serverSocket == null){
- System.out.println("【创建ServerSocket失败!】");
- return;
- }
- //启动扫描线程
- Thread scanThread = new Thread(new scan());
- scanThread.start();
- //等待用户心跳包请求
- while(true){
- Socket socket = null;
- try {
- socketConnect = socketConnect + 1;
- //接收客户端的连接
- socket = serverSocket.accept();
- //为该连接创建一个工作线程
- Thread workThread = new Thread(new Handler(socket));
- //启动工作线程
- workThread.start();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 创建一个ServerSocket来侦听用户心跳包请求
- * @param port 指定的服务器端的端口
- * @return 返回ServerSocket
- * @author dream
- */
- public ServerSocket startListenUserReport(int port){
- try {
- ServerSocket serverSocket = new ServerSocket();
- if(!serverSocket.getReuseAddress()){
- serverSocket.setReuseAddress(true);
- }
- serverSocket.bind(new InetSocketAddress(host,port));
- System.out.println("【开始在"+serverSocket.getLocalSocketAddress()+"上侦听用户的心跳包请求!】");
- return serverSocket;
- } catch (IOException e) {
- System.out.println("【端口"+port+"已经被占用!】");
- if (serverSocket != null) {
- if (!serverSocket.isClosed()) {
- try {
- serverSocket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- }
- }
- return serverSocket;
- }
- //工作线程类
- class Handler implements Runnable{
- private Socket socket;
- UserState us = null;
- User newUser = null;
- private int userId;
- private int userState;
- /**
- * 构造函数,从调用者那里取得socket
- * @param socket 指定的socket
- * @author dream
- */
- public Handler(Socket socket){
- this.socket = socket;
- }
- /**
- * 从指定的socket中得到输入流
- * @param socket 指定的socket
- * @return 返回BufferedReader
- * @author dream
- */
- private BufferedReader getReader(Socket socket){
- InputStream is = null;
- BufferedReader br = null;
- try {
- is = socket.getInputStream();
- br = new BufferedReader(new InputStreamReader(is));
- } catch (IOException e) {
- e.printStackTrace();
- }
- return br;
- }
- public void run() {
- try{
- workThreadNum = workThreadNum +1;
- System.out.println("【第"+workThreadNum+"个的连接:"+socket.getInetAddress()+":"+socket.getPort()+"】");
- BufferedReader br = getReader(socket);
- String meg = null;
- StringBuffer report = new StringBuffer();
- while ((meg = br.readLine()) != null) {
- report.append(meg);
- if (meg.contains(csEndFlag)) {
- us = getReporterUserState(meg, socket);
- synchronized (hashLock) {
- userStateList.put(userId, us);
- }
- }
- }
- }catch(IOException e){
- System.out.println("【客户:"+newUser.getUser_id()+"已经断开连接!】");
- userStateList.remove( userId );
- announceStateChange( userId , -1);
- }finally{
- if(socket != null){
- try {
- //断开连接
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private UserState getReporterUserState(String meg , Socket socket){
- UserState us = new UserState();
- try {
- Document requestDoc = DocumentHelper.parseText(meg);
- newUser = ServerControler.parseXmlToUserState(requestDoc,socket);
- userId = newUser.getUser_id();
- userState = newUser.getUser_state();
- us.setFlag(2);
- us.setUser_state( userState );
- us.setUser_id( userId );
- us.setUser_ip(newUser.getUser_ip());
- us.setUser_port(newUser.getUser_port());
- } catch (DocumentException e) {
- System.out.println("【来自客户端的信息不是一个合法的心跳包协议】");
- }
- return us;
- }
- }
- //扫描线程
- class scan implements Runnable{
- public void run() {
- while (true) {
- System.out.println("*******"+new Date()+":扫描线程开始扫描"+"*******");
- synchronized (hashLock) {
- if(!userStateList.isEmpty()){
- //遍历在线用户列表
- for (Map.Entry<Integer, UserState> entry : userStateList.entrySet()) {
- int flag = entry.getValue().getFlag();
- if ( (flag - 1) < 0) {
- //在这里通知该用户的好友其状态发生改变
- // announceStateChange(entry.getKey() , 0);
- }else{
- entry.getValue().setFlag(flag - 1);
- userStateList.put(entry.getKey(), entry.getValue());
- }
- System.out.println(entry.getKey() + "-->" + entry.getValue().toString());
- }
- }else{
- System.out.println("现在还没有在线用户!");
- }
- }
- //实现定时扫描
- try {
- sleep(scanTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private void announceStateChange(int userId , int state){
- System.out.println("通知其好友!");
- }
- /**
- * 查询一个用户是否在线
- * @param userId 指定要查询状态的用户的ID
- * @return true 在线; false 不在线;
- * @author dream
- */
- public boolean isAlive(int userId){
- synchronized (hashLock) {
- return userStateList.containsKey(userId);
- }
- }
- /**
- * 返回指定用户ID的状态
- * @param userId 指定要查询状态的用户的ID
- * @return >0 该用户在线; -1 该用户离线
- * @author dream
- */
- public int getUserState(int userId){
- synchronized (hashLock) {
- if(userStateList.containsKey(userId)){
- return userStateList.get(userId).getUser_state();
- }else{
- return -1;
- }
- }
- }
- public Object getHashLock() {
- return hashLock;
- }
- public void setHashLock(Object hashLock) {
- this.hashLock = hashLock;
- }
- public String getHost() {
- return host;
- }
- public void setHost(String host) {
- this.host = host;
- }
- public int getStateReportPort() {
- return stateReportPort;
- }
- public void setStateReportPort(int stateReportPort) {
- this.stateReportPort = stateReportPort;
- }
- public String getEndFlag() {
- return endFlag;
- }
- public void setEndFlag(String endFlag) {
- this.endFlag = endFlag;
- }
- public int getScanTime() {
- return scanTime;
- }
- public void setScanTime(int scanTime) {
- this.scanTime = scanTime;
- }
- public static HashMap<Integer, UserState> getUserStateList() {
- return userStateList;
- }
- public static int getWorkThreadNum() {
- return workThreadNum;
- }
- public static int getSocketConnect() {
- return socketConnect;
- }
- //测试本函数的main函数
- public static void main(String arg[]){
- UserStateManage usm = new UserStateManage();
- usm.start();
- }
- }