zookeeper服务发现

Posted tinyj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper服务发现相关的知识,希望对你有一定的参考价值。

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

实现代码如下:

import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

/**
 * 代表工作服务器
 */
public class WorkServer {

    private ZkClient zkClient;
    // ZooKeeper
    private String configPath;
    // ZooKeeper集群中servers节点的路径
    private String serversPath;
    // 当前工作服务器的基本信息
    private ServerData serverData;
    // 当前工作服务器的配置信息
    private ServerConfig serverConfig;
    private IZkDataListener dataListener;

    public WorkServer(String configPath, String serversPath,
                      ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
        this.zkClient = zkClient;
        this.serversPath = serversPath;
        this.configPath = configPath;
        this.serverConfig = initConfig;
        this.serverData = serverData;

        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {

            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                String retJson = new String((byte[])data);
                ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson,ServerConfig.class);
                updateConfig(serverConfigLocal);
                System.out.println("new Work server config is:"+serverConfig.toString());

            }
        };

    }

    // 启动服务器
    public void start() {
        System.out.println("work server start...");
        initRunning();
    }

    // 停止服务器
    public void stop() {
        System.out.println("work server stop...");
        zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
    }

    // 服务器初始化
    private void initRunning() {
        registMe(); // 注册自己
        zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
    }

    // 启动时向zookeeper注册自己的注册函数
    private void registMe() {
        String mePath = serversPath.concat("/").concat(serverData.getAddress());

        try {
            zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
                    .getBytes());
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(serversPath, true);
            registMe();
        }
    }

    // 更新自己的配置信息
    private void updateConfig(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
    }

}
/**
 * 调度类
 */
public class SubscribeZkClient {

    private static final int  CLIENT_QTY = 5; // Work Server数量

    private static final String  ZOOKEEPER_SERVER = "192.168.1.105:2181";

    private static final String  CONFIG_PATH = "/config";
    private static final String  COMMAND_PATH = "/command";
    private static final String  SERVERS_PATH = "/servers";

    public static void main(String[] args) throws Exception {

        List<ZkClient> clients = new ArrayList<ZkClient>();
        List<WorkServer>  workServers = new ArrayList<WorkServer>();
        ManageServer manageServer = null;

        try {

            // 创建一个默认的配置
            ServerConfig initConfig = new ServerConfig();
            initConfig.setDbPwd("123456");
            initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
            initConfig.setDbUser("root");

            // 实例化一个Manage Server
            ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
            manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
            manageServer.start(); // 启动Manage Server

            // 创建指定个数的工作服务器
            for ( int i = 0; i < CLIENT_QTY; ++i ) {
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
                clients.add(client);
                ServerData serverData = new ServerData();
                serverData.setId(i);
                serverData.setName("WorkServer#"+i);
                serverData.setAddress("192.168.1."+i);

                WorkServer  workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
                workServers.add(workServer);
                workServer.start();    // 启动工作服务器

            }

            System.out.println("敲回车键退出!
");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } finally {
            System.out.println("Shutting down...");

            for ( WorkServer workServer : workServers ) {
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            for ( ZkClient client : clients ) {
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }

}
/**
 * 服务器基本信息
 */
public class ServerData {

    private String address;
    private Integer id;
    private String name;

    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "ServerData [address=" + address + ", id=" + id + ", name="
                + name + "]";
    }

}
/**
 * 配置信息
 */
public class ServerConfig {

    private String dbUrl;
    private String dbPwd;
    private String dbUser;
    public String getDbUrl() {
        return dbUrl;
    }
    public void setDbUrl(String dbUrl) {
        this.dbUrl = dbUrl;
    }
    public String getDbPwd() {
        return dbPwd;
    }
    public void setDbPwd(String dbPwd) {
        this.dbPwd = dbPwd;
    }
    public String getDbUser() {
        return dbUser;
    }
    public void setDbUser(String dbUser) {
        this.dbUser = dbUser;
    }

    @Override
    public String toString() {
        return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
                + ", dbUser=" + dbUser + "]";
    }

}
import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.List;

public class ManageServer {

    // zookeeper的servers节点路径
    private String serversPath;
    // zookeeper的command节点路径
    private String commandPath;
    // zookeeper的config节点路径
    private String configPath;
    private ZkClient zkClient;
    private ServerConfig config;
    // 用于监听servers节点的子节点列表的变化
    private IZkChildListener childListener;
    // 用于监听command节点数据内容的变化
    private IZkDataListener dataListener;
    // 工作服务器的列表
    private List<String> workServerList;

    public ManageServer(String serversPath, String commandPath,
                        String configPath, ZkClient zkClient, ServerConfig config) {
        this.serversPath = serversPath;
        this.commandPath = commandPath;
        this.zkClient = zkClient;
        this.config = config;
        this.configPath = configPath;
        this.childListener = new IZkChildListener() {

            public void handleChildChange(String parentPath,
                                          List<String> currentChilds) throws Exception {
                // TODO Auto-generated method stub
                workServerList = currentChilds; // 更新内存中工作服务器列表

                System.out.println("work server list changed, new list is ");
                execList();

            }
        };
        this.dataListener = new IZkDataListener() {

            public void handleDataDeleted(String dataPath) throws Exception {
                // TODO Auto-generated method stub
                // ignore;
            }

            public void handleDataChange(String dataPath, Object data)
                    throws Exception {
                // TODO Auto-generated method stub
                String cmd = new String((byte[]) data);
                System.out.println("cmd:"+cmd);
                exeCmd(cmd); // 执行命令

            }
        };

    }

    private void initRunning() {
        zkClient.subscribeDataChanges(commandPath, dataListener);
        zkClient.subscribeChildChanges(serversPath, childListener);
    }

    /*
     * 1: list 2: create 3: modify
     */
    private void exeCmd(String cmdType) {
        if ("list".equals(cmdType)) {
            execList();

        } else if ("create".equals(cmdType)) {
            execCreate();
        } else if ("modify".equals(cmdType)) {
            execModify();
        } else {
            System.out.println("error command!" + cmdType);
        }

    }

    // 列出工作服务器列表
    private void execList() {
        System.out.println(workServerList.toString());
    }

    // 创建config节点
    private void execCreate() {
        if (!zkClient.exists(configPath)) {
            try {
                zkClient.createPersistent(configPath, JSON.toJSONString(config)
                        .getBytes());
            } catch (ZkNodeExistsException e) {
                zkClient.writeData(configPath, JSON.toJSONString(config)
                        .getBytes()); // config节点已经存在,则写入内容就可以了
            } catch (ZkNoNodeException e) {
                String parentDir = configPath.substring(0,
                        configPath.lastIndexOf(‘/‘));
                zkClient.createPersistent(parentDir, true);
                execCreate();
            }
        }
    }

    // 修改config节点内容
    private void execModify() {
        // 我们随意修改config的一个属性就可以了
        config.setDbUser(config.getDbUser() + "_modify");

        try {
            zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
        } catch (ZkNoNodeException e) {
            execCreate(); // 写入时config节点还未存在,则创建它
        }
    }

    // 启动工作服务器
    public void start() {
        initRunning();
    }

    // 停止工作服务器
    public void stop() {
        zkClient.unsubscribeChildChanges(serversPath, childListener);
        zkClient.unsubscribeDataChanges(commandPath, dataListener);
    }

}

 

以上是关于zookeeper服务发现的主要内容,如果未能解决你的问题,请参考以下文章

基于ZooKeeper实现服务注册与发现

zookeeper服务发现

RPC ---- 基于ZooKeeper为注册中心实现的RPC

RPC ---- 基于ZooKeeper为注册中心实现的RPC

RPC ---- 基于ZooKeeper为注册中心实现的RPC

SpringCloud-2.0(6. 服务注册发现 - ZooKeeper)