基于zookeeper连接池Failover/LoadBalance等改造Thrift 服务化

Posted 舒润

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于zookeeper连接池Failover/LoadBalance等改造Thrift 服务化相关的知识,希望对你有一定的参考价值。

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法

4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

5.其他的改造如:

1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

2)Thrift通过两种方式调用服务Client和Iface

 

  1. // *) Client API 调用  
  2. (EchoService.Client)client.echo("hello lilei");  ---(1)  
  3. // *) Service 接口 调用  
  4. (EchoService.Iface)service.echo("hello lilei");  ---(2)  
// *) Client API 调用
(EchoService.Client)client.echo("hello lilei");  ---(1)
// *) Service 接口 调用
(EchoService.Iface)service.echo("hello lilei");  ---(2)

Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

 

下面我们来一一实现:

一、pom.xml引入依赖jar包

 

  1. <dependency>  
  2.             <groupId>org.apache.thrift</groupId>  
  3.             <artifactId>libthrift</artifactId>  
  4.             <version>0.9.2</version>  
  5.         </dependency>  
  6.         <dependency>  
  7.             <groupId>commons-pool</groupId>  
  8.             <artifactId>commons-pool</artifactId>  
  9.             <version>1.6</version>  
  10.         </dependency>  
  11.         <dependency>  
  12.             <groupId>org.springframework</groupId>  
  13.             <artifactId>spring-context</artifactId>  
  14.             <version>4.0.9.RELEASE</version>  
  15.         </dependency>  
  16.   
  17.         <dependency>  
  18.             <groupId>org.apache.zookeeper</groupId>  
  19.             <artifactId>zookeeper</artifactId>  
  20.             <version>3.4.6</version>  
  21.         </dependency>  
  22.         <dependency>  
  23.             <groupId>org.apache.curator</groupId>  
  24.             <artifactId>curator-recipes</artifactId>  
  25.             <version>2.7.1</version>  
  26.         </dependency>  
<dependency>
			<groupId>org.apache.thrift</groupId>
			<artifactId>libthrift</artifactId>
			<version>0.9.2</version>
		</dependency>
		<dependency>
			<groupId>commons-pool</groupId>
			<artifactId>commons-pool</artifactId>
			<version>1.6</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>4.0.9.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.7.1</version>
		</dependency>

二、使用zookeeper管理服务节点配置

RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构: 

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
  1). namespace: 命名空间,来区分不同应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.java

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import org.apache.curator.framework.CuratorFramework;  
  4. import org.apache.curator.framework.CuratorFrameworkFactory;  
  5. import org.apache.curator.retry.ExponentialBackoffRetry;  
  6. import org.springframework.beans.factory.FactoryBean;  
  7. import org.springframework.util.StringUtils;  
  8.   
  9. /** 
  10.  * 获取zookeeper客户端链接 
  11.  */  
  12. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
  13.   
  14.     private String zkHosts;  
  15.     // session超时  
  16.     private int sessionTimeout = 30000;  
  17.     private int connectionTimeout = 30000;  
  18.   
  19.     // 共享一个zk链接  
  20.     private boolean singleton = true;  
  21.   
  22.     // 全局path前缀,常用来区分不同的应用  
  23.     private String namespace;  
  24.   
  25.     private final static String ROOT = "rpc";  
  26.   
  27.     private CuratorFramework zkClient;  
  28.   
  29.     public void setZkHosts(String zkHosts) {  
  30.         this.zkHosts = zkHosts;  
  31.     }  
  32.   
  33.     public void setSessionTimeout(int sessionTimeout) {  
  34.         this.sessionTimeout = sessionTimeout;  
  35.     }  
  36.   
  37.     public void setConnectionTimeout(int connectionTimeout) {  
  38.         this.connectionTimeout = connectionTimeout;  
  39.     }  
  40.   
  41.     public void setSingleton(boolean singleton) {  
  42.         this.singleton = singleton;  
  43.     }  
  44.   
  45.     public void setNamespace(String namespace) {  
  46.         this.namespace = namespace;  
  47.     }  
  48.   
  49.     public void setZkClient(CuratorFramework zkClient) {  
  50.         this.zkClient = zkClient;  
  51.     }  
  52.   
  53.     @Override  
  54.     public CuratorFramework getObject() throws Exception {  
  55.         if (singleton) {  
  56.             if (zkClient == null) {  
  57.                 zkClient = create();  
  58.                 zkClient.start();  
  59.             }  
  60.             return zkClient;  
  61.         }  
  62.         return create();  
  63.     }  
  64.   
  65.     @Override  
  66.     public Class<?> getObjectType() {  
  67.         return CuratorFramework.class;  
  68.     }  
  69.   
  70.     @Override  
  71.     public boolean isSingleton() {  
  72.         return singleton;  
  73.     }  
  74.   
  75.     public CuratorFramework create() throws Exception {  
  76.         if (StringUtils.isEmpty(namespace)) {  
  77.             namespace = ROOT;  
  78.         } else {  
  79.             namespace = ROOT +"/"+ namespace;  
  80.         }  
  81.         return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
  82.     }  
  83.   
  84.     public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
  85.         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
  86.         return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
  87.                 .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
  88.                 .defaultData(null).build();  
  89.     }  
  90.   
  91.     public void close() {  
  92.         if (zkClient != null) {  
  93.             zkClient.close();  
  94.         }  
  95.     }  
  96. }  
package cn.slimsmart.thrift.rpc.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils;

/**
 * 获取zookeeper客户端链接
 */
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {

	private String zkHosts;
	// session超时
	private int sessionTimeout = 30000;
	private int connectionTimeout = 30000;

	// 共享一个zk链接
	private boolean singleton = true;

	// 全局path前缀,常用来区分不同的应用
	private String namespace;

	private final static String ROOT = "rpc";

	private CuratorFramework zkClient;

	public void setZkHosts(String zkHosts) {
		this.zkHosts = zkHosts;
	}

	public void setSessionTimeout(int sessionTimeout) {
		this.sessionTimeout = sessionTimeout;
	}

	public void setConnectionTimeout(int connectionTimeout) {
		this.connectionTimeout = connectionTimeout;
	}

	public void setSingleton(boolean singleton) {
		this.singleton = singleton;
	}

	public void setNamespace(String namespace) {
		this.namespace = namespace;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public CuratorFramework getObject() throws Exception {
		if (singleton) {
			if (zkClient == null) {
				zkClient = create();
				zkClient.start();
			}
			return zkClient;
		}
		return create();
	}

	@Override
	public Class<?> getObjectType() {
		return CuratorFramework.class;
	}

	@Override
	public boolean isSingleton() {
		return singleton;
	}

	public CuratorFramework create() throws Exception {
		if (StringUtils.isEmpty(namespace)) {
			namespace = ROOT;
		} else {
			namespace = ROOT +"/"+ namespace;
		}
		return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
	}

	public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
		CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
		return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
				.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
				.defaultData(null).build();
	}

	public void close() {
		if (zkClient != null) {
			zkClient.close();
		}
	}
}

 

2.服务端注册服务

由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

ThriftServerIpResolve.java

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  *  
  5.  * 解析thrift-server端IP地址,用于注册服务 
  6.  * 1) 可以从一个物理机器或者虚机的特殊文件中解析 
  7.  * 2) 可以获取指定网卡序号的Ip 
  8.  * 3) 其他 
  9.  */  
  10. public interface ThriftServerIpResolve {  
  11.       
  12.     String getServerIp() throws Exception;  
  13.       
  14.     void reset();  
  15.       
  16.     //当IP变更时,将会调用reset方法  
  17.     static interface IpRestCalllBack{  
  18.         public void rest(String newIp);  
  19.     }  
  20. }  
package cn.slimsmart.thrift.rpc.zookeeper;

/**
 * 
 * 解析thrift-server端IP地址,用于注册服务
 * 1) 可以从一个物理机器或者虚机的特殊文件中解析
 * 2) 可以获取指定网卡序号的Ip
 * 3) 其他
 */
public interface ThriftServerIpResolve {
	
	String getServerIp() throws Exception;
	
	void reset();
	
	//当IP变更时,将会调用reset方法
	static interface IpRestCalllBack{
		public void rest(String newIp);
	}
}

可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.Inet6Address;  
  4. import java.net.InetAddress;  
  5. import java.net.NetworkInterface;  
  6. import java.net.SocketException;  
  7. import java.util.Enumeration;  
  8.   
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. /** 
  13.  * 解析网卡Ip 
  14.  * 
  15.  */  
  16. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
  17.       
  18.     private Logger logger = LoggerFactory.getLogger(getClass());  
  19.   
  20.     //缓存  
  21.     private String serverIp;  
  22.       
  23.     public void setServerIp(String serverIp) {  
  24.         this.serverIp = serverIp;  
  25.     }  
  26.   
  27.     @Override  
  28.     public String getServerIp() {  
  29.         if (serverIp != null) {  
  30.             return serverIp;  
  31.         }  
  32.         // 一个主机有多个网络接口  
  33.         try {  
  34.             Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
  35.             while (netInterfaces.hasMoreElements()) {  
  36.                 NetworkInterface netInterface = netInterfaces.nextElement();  
  37.                 // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .  
  38.                 Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
  39.                 while (addresses.hasMoreElements()) {  
  40.                     InetAddress address = addresses.nextElement();  
  41.                     if(address instanceof Inet6Address){  
  42.                         continue;  
  43.                     }  
  44.                     if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
  45.                         serverIp = address.getHostAddress();  
  46.                         logger.info("resolve server ip :"+ serverIp);  
  47.                         continue;  
  48.                     }  
  49.                 }  
  50.             }  
  51.         } catch (SocketException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return serverIp;  
  55.     }  
  56.   
  57.     @Override  
  58.     public void reset() {  
  59.         serverIp = null;  
  60.     }  
  61. }  
package cn.slimsmart.thrift.rpc.zookeeper;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 解析网卡Ip
 *
 */
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
	
	private Logger logger = LoggerFactory.getLogger(getClass());

	//缓存
	private String serverIp;
	
	public void setServerIp(String serverIp) {
		this.serverIp = serverIp;
	}

	@Override
	public String getServerIp() {
		if (serverIp != null) {
			return serverIp;
		}
		// 一个主机有多个网络接口
		try {
			Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
			while (netInterfaces.hasMoreElements()) {
				NetworkInterface netInterface = netInterfaces.nextElement();
				// 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
				Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
				while (addresses.hasMoreElements()) {
					InetAddress address = addresses.nextElement();
					if(address instanceof Inet6Address){
						continue;
					}
					if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
						serverIp = address.getHostAddress();
						logger.info("resolve server ip :"+ serverIp);
						continue;
					}
				}
			}
		} catch (SocketException e) {
			e.printStackTrace();
		}
		return serverIp;
	}

	@Override
	public void reset() {
		serverIp = null;
	}
}

接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器 
  5.  */  
  6. public interface ThriftServerAddressRegister {  
  7.     /** 
  8.      * 发布服务接口 
  9.      * @param service 服务接口名称,一个产品中不能重复 
  10.      * @param version 服务接口的版本号,默认1.0.0 
  11.      * @param address 服务发布的地址和端口 
  12.      */  
  13.     void register(String service,String version,String address);  
  14. }  
package cn.slimsmart.thrift.rpc.zookeeper;

/**
 * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
 */
public interface ThriftServerAddressRegister {
	/**
	 * 发布服务接口
	 * @param service 服务接口名称,一个产品中不能重复
	 * @param version 服务接口的版本号,默认1.0.0
	 * @param address 服务发布的地址和端口
	 */
	void register(String service,String version,String address);
}

实现:ThriftServerAddressRegisterZookeeper.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.io.UnsupportedEncodingException;  
  4.   
  5. import org.apache.curator.framework.CuratorFramework;  
  6. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  7. import org.apache.zookeeper.CreateMode;  
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10. import org.springframework.util.StringUtils;  
  11.   
  12. import cn.slimsmart.thrift.rpc.ThriftException;  
  13.   
  14. /** 
  15.  *  注册服务列表到Zookeeper 
  16.  */  
  17. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
  18.       
  19.     private Logger logger = LoggerFactory.getLogger(getClass());  
  20.       
  21.     private CuratorFramework zkClient;  
  22.       
  23.     public ThriftServerAddressRegisterZookeeper(){}  
  24.       
  25.     public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
  26.         this.zkClient = zkClient;  
  27.     }  
  28.   
  29.     public void setZkClient(CuratorFramework zkClient) {  
  30.         this.zkClient = zkClient;  
  31.     }  
  32.   
  33.     @Override  
  34.     public void register(String service, String version, String address) {  
  35.         if(zkClient.getState() == CuratorFrameworkState.LATENT){  
  36.             zkClient.start();  
  37.         }  
  38.         if(StringUtils.isEmpty(version)){  
  39.             version="1.0.0";  
  40.         }  
  41.         //临时节点  
  42.         try {  
  43.             zkClient.create()  
  44.                 .creatingParentsIfNeeded()  
  45.                 .withMode(CreateMode.EPHEMERAL)  
  46.                 .forPath("/"+service+"/"+version+"/"+address);  
  47.         } catch (UnsupportedEncodingException e) {  
  48.             logger.error("register service address to zookeeper exception:{}",e);  
  49.             throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
  50.         } catch (Exception e) {  
  51.             logger.error("register service address to zookeeper exception:{}",e);  
  52.             throw new ThriftException("register service address to zookeeper exception:{}", e);  
  53.         }  
  54.     }  
  55.       
  56.     public void close(){  
  57.         zkClient.close();  
  58.     }  
  59. }  
package cn.slimsmart.thrift.rpc.zookeeper;

import java.io.UnsupportedEncodingException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import cn.slimsmart.thrift.rpc.ThriftException;

/**
 *  注册服务列表到Zookeeper
 */
public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
	
	private Logger logger = LoggerFactory.getLogger(getClass());
	
	private CuratorFramework zkClient;
	
	public ThriftServerAddressRegisterZookeeper(){}
	
	public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
		this.zkClient = zkClient;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public void register(String service, String version, String address) {
		if(zkClient.getState() == CuratorFrameworkState.LATENT){
			zkClient.start();
		}
		if(StringUtils.isEmpty(version)){
			version="1.0.0";
		}
		//临时节点
		try {
			zkClient.create()
				.creatingParentsIfNeeded()
				.withMode(CreateMode.EPHEMERAL)
				.forPath("/"+service+"/"+version+"/"+address);
		} catch (UnsupportedEncodingException e) {
			logger.error("register service address to zookeeper exception:{}",e);
			throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
		} catch (Exception e) {
			logger.error("register service address to zookeeper exception:{}",e);
			throw new ThriftException("register service address to zookeeper exception:{}", e);
		}
	}
	
	public void close(){
		zkClient.close();
	}
}

 

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.List;  
  5.   
  6. /** 
  7.  * thrift server-service地址提供者,以便构建客户端连接池 
  8.  */  
  9. public interface ThriftServerAddressProvider {  
  10.       
  11.     //获取服务名称  
  12.     String getService();  
  13.   
  14.     /** 
  15.      * 获取所有服务端地址 
  16.      * @return 
  17.      */  
  18.     List<InetSocketAddress> findServerAddressList();  
  19.   
  20.     /** 
  21.      * 选取一个合适的address,可以随机获取等\' 
  22.      * 内部可以使用合适的算法. 
  23.      * @return 
  24.      */  
  25.     InetSocketAddress selector();  
  26.   
  27.     void close();  
  28. }  
package cn.slimsmart.thrift.rpc.zookeeper;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * thrift server-service地址提供者,以便构建客户端连接池
 */
public interface ThriftServerAddressProvider {
	
	//获取服务名称
	String getService();

	/**
	 * 获取所有服务端地址
	 * @return
	 */
    List<InetSocketAddress> findServerAddressList();

    /**
     * 选取一个合适的address,可以随机获取等\'
     * 内部可以使用合适的算法.
     * @return
     */
    InetSocketAddress selector();

    void close();
}

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.ArrayList;  
  5. import java.util.Collections;  
  6. import java.util.HashSet;  
  7. import java.util.LinkedList;  
  8. import java.util.List;  
  9. import java.util.Queue;  
  10. import java.util.Set;  
  11.   
  12. import org.apache.curator.framework.CuratorFramework;  
  13. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  14. import org.apache.curator.framework.recipes.cache.ChildData;  
  15. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
  16. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
  17. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
  18. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
  19. import org.slf4j.Logger;  
  20. import org.slf4j.LoggerFactory;  
  21. import org.springframework.beans.factory.InitializingBean;  
  22.   
  23. /** 
  24.  * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 
  25.  */  
  26. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
  27.   
  28.     private Logger logger = LoggerFactory.getLogger(getClass());  
  29.   
  30.     // 注册服务  
  31.     private String service;  
  32.     // 服务版本号  
  33.     以上是关于基于zookeeper连接池Failover/LoadBalance等改造Thrift 服务化的主要内容,如果未能解决你的问题,请参考以下文章

    Go 基于 channel 实现连接池

    Kafka 安装教程 + nodejs 连接

    dubbo使用zookeeper连接,zookeeper宕机后怎么处理?

    HttpClient 基于连接池的使用

    spark streaming可以基于executor建立mysql的连接池吗

    如何用java实现基于JedisCluster对象的连接池