2018-07-14期 ZK编程案例-分布式协调本人亲自反复验证通过分享
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2018-07-14期 ZK编程案例-分布式协调本人亲自反复验证通过分享相关的知识,希望对你有一定的参考价值。
利用ZK监听器实现分布式协调服务,即实现服务端服务健康状态的实时监测。
1、编写一个服务端程序,实现原理:
(1)服务端程序启动后,开启Socket监听
(2)开启Socket监听后,将自己监听Socket身份信息临时写入Zookeeper集群
(3)服务关闭后,Zookeeper集群自动将该服务身份信息从ZK集群清除
实现代码
package cn.itcast.zk.distributeserver;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Enumeration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributeServer {
/**
* 连接zk,服务启动后往zk注册服务器信息,并启动监听端口9091
*
* @throws Exception
*/
public void connectionZk(String zNodeVal) throws Exception {
/*
* 这里连接ZK,无需注册监听器,所有监听器watcher为null
*/
ZooKeeper zkCli = new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181", 2000, null);
if (zkCli.exists("/server", null) == null) {
zkCli.create("/server", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/*
* 服务启动后,注册自己的身份信息到ZK,注册方式采用临时注册EPHEMERAL_SEQUENTIAL,同时生成的Znode为在/server/host+
* 自增序列号SEQUENTIAL 如/server/host00000000000001
*/
zkCli.create("/server/host", zNodeVal.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
/**
* 主线程 模拟服务端socket监听,并注册ZK集群
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DistributeServer distributeServer = new DistributeServer();
InetAddress localAddress = new DistributeServer().getLocalHostLANAddress();
String zNodeVal = localAddress.getHostName() + ":" + localAddress.getHostAddress() + ":9091";
// 服务启动后,将服务信息注册到zk
distributeServer.connectionZk(zNodeVal);
// 将服务端口监听起来,实时准备处理客户端发送过来消息
distributeServer.handleBusiness(localAddress.getHostAddress());
}
/**
* 模拟处理业务 这里为开启Socket监听,并接收客户端发送过来的消息。
*
* @param ipaddress
* @throws IOException
*/
public static void handleBusiness(String ipaddress) throws IOException {
ServerSocket server = new ServerSocket(9091);
try {
System.out.println("Server " + ipaddress + " has listener on 9091......");
Socket client = server.accept();
try {
BufferedReader input = new BufferedReader(new InputStreamReader(client.getInputStream()));
boolean flag = true;
while (flag) {
String line = input.readLine();
if (line.equals("exit")) {
flag = false;
System.out.println("Client exit!");
} else {
System.out.println("Client Msg:" + line);
}
}
} finally {
client.close();
server.close();
/**
* 防止客户端连接断开后,服务端端监听异常,因此这里没每次处理完客户端消息后,都会重新建立监听
*/
DistributeServer.handleBusiness(ipaddress);
}
} finally {
server.close();
}
}
/**
* 自动获取操作系统IP地址,目的是让服务端socket在该地址上建立监听。
*
* @return
* @throws Exception
*/
public InetAddress getLocalHostLANAddress() throws Exception {
try {
InetAddress candidateAddress = null;
// 遍历所有的网络接口
for (Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); ifaces
.hasMoreElements();) {
NetworkInterface iface = (NetworkInterface) ifaces.nextElement();
// 在所有的接口下再遍历IP
for (Enumeration inetAddrs = iface.getInetAddresses(); inetAddrs.hasMoreElements();) {
InetAddress inetAddr = (InetAddress) inetAddrs.nextElement();
if (!inetAddr.isLoopbackAddress()) {// 排除loopback类型地址
if (inetAddr.isSiteLocalAddress()) {
// 如果是site-local地址,就是它了
return inetAddr;
} else if (candidateAddress == null) {
// site-local类型的地址未被发现,先记录候选地址
candidateAddress = inetAddr;
}
}
}
}
if (candidateAddress != null) {
return candidateAddress;
}
// 如果没有发现 non-loopback地址.只能用最次选的方案
InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
return jdkSuppliedAddress;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2、客户端程序,实现原理:
利用ZK监听器实现分布式协调服务,即实现服务端服务健康状态的实时监测。
(1)、服务端服务启动后,会立即向ZK注册自身服务标识信息
(2)、若服务端服务掉线,则会ZK会自动将该服务标识信息从ZK集群清除
(3)、客户端实时监听ZK集群中服务在线情况,若服务端服务掉线,会被客户端监听器立即监听到
(4)、客户端监听到服务端服务掉线后,相关后续业务不会再提交给掉线服务处理
实现代码:
package cn.itcast.zk.distributeserver;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper.States;
/**
* 利用ZK监听器实现分布式协调服务,即实现服务端服务健康状态的实时监测。
* 实现原理:
* 1、服务端服务启动后,会立即向ZK注册自身服务标识信息
* 2、若服务端服务掉线,则会ZK会自动将该服务标识信息从ZK集群清除
* 3、客户端实时监听ZK集群中服务在线情况,若服务端服务掉线,会被客户端监听器立即监听到
* 4、客户端监听到服务端服务掉线后,先关后续业务不会再提交给掉线服务处理
* 5、本代码具体主要实现以下功能:
* (1)服务端程序启动后,会立即启动一个Socket监听,并将Socket连接信息注册到ZK集群,注册内容为IP:PORT套接字
* (2)客户端通过ZK一直监听服务端在线情况
* (3)客户端定时向各个服务端发送Socket消息,默认情况下如果所有服务端服务均正常,则所有服务端都会收到客户端发送的Socket消息
* 若服务端服务异常,则客户端会检测到,将不会向掉线的服务发送socket消息。这样就保障了客户端发送的消息不会丢失,总有活动的
* 服务端能接收到。
*
* @author songjq
*
*/
public class DistributeClient {
// 为了线程安全,需要加volatile修饰符
volatile private static ArrayList<String> hlist = new ArrayList<String>();
private static ZooKeeper zk1_;
public static void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {
if (States.CONNECTING == zooKeeper.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;
}
/**
* 监听器回调方法 如果需要对某个znode进行持续监听,需要重新在回调方法中注册监听器
*/
@Override
public void process(WatchedEvent event) {
System.out.println("节点:" + event.getPath() + " 发生了事件:" + event.getType());
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
}
try {
/**
* 这里调用获取叶子加点的方法,目的是在该监听器中重新注册监听,防止监听失效。
*/
getServerHostList(zk1_);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 获取zk连接
* @param hostports
* @param times
* @return
* @throws Exception
*/
public ZooKeeper getConnection(String hostports, int times) throws Exception {
ZooKeeper zktmp = new ZooKeeper(hostports, 1000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("节点:" + event.getPath() + " 发生了事件:" + event.getType());
try {
getServerHostList(zk1_);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return zktmp;
}
/**
* 获取类的实例
*/
static private DistributeClient static_;
static public DistributeClient Instance() {
if (static_ == null) {
static_ = new DistributeClient();
}
return static_;
}
public static void main(String[] args) throws Exception {
String hostports = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";
DistributeClient instance = DistributeClient.Instance();
zk1_ = instance.getConnection(hostports, 2000);
getServerHostList(zk1_);
/*
* 模拟客户端永不间断的定时向活动的服务端发送socket消息
*/
while (true) {
getServerHostList(zk1_);
for (String host : hlist) {
String[] hostAry = host.split(":");
String hostname = hostAry[0];
String ip = hostAry[1];
int port = Integer.parseInt(hostAry[2]);
new DistributeClient().handleBusiness(ip, port, hostname);
System.out.println(host);
}
Thread.sleep(20000);
System.out.println("---------------");
}
}
/**
* 获取在线服务节点
* @param zkCli
* @throws KeeperException
* @throws InterruptedException
*/
public static void getServerHostList(ZooKeeper zkCli) throws KeeperException, InterruptedException {
hlist.clear();
/*
* 获取/server/叶子节点,并同时在该叶子节点注册监听器
*/
List<String> hosts = zkCli.getChildren("/server", true);
for (String host : hosts) {
byte[] data = zkCli.getData("/server/" + host, null, null);
hlist.add(new String(data));
}
}
/**
* 模拟处理业务
* 向活动的服务端发送Socket消息
* @throws IOException
*/
public void handleBusiness(String IP, int port, String hostname) throws IOException {
Socket client = new Socket(IP, port);
try {
PrintWriter output = new PrintWriter(client.getOutputStream(), true);
/*
* 向服务端发送消息
*/
String words = "Client MSG->" + hostname + "," + IP;
output.println(words);
} finally {
client.close();
}
}
}
以上是关于2018-07-14期 ZK编程案例-分布式协调本人亲自反复验证通过分享的主要内容,如果未能解决你的问题,请参考以下文章