Thrift连接池的实现(Java)
Posted LaughingVzr
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Thrift连接池的实现(Java)相关的知识,希望对你有一定的参考价值。
在众多RPC的框架中,Thrift可以说以使用简洁、高性能著称,但对于同步的Client的实际使用还有一些问题,比如下文要说到的连接池问题。在官方的Demo中并没有池化的实现,这就需要我们动手来实现,下文中使用的是Apache Common Pool来做的实现。
实现PooledObjectFactory接口
主要实现该接口中的 makeObject
、 destoryObject
方法,第一个是创建对象,第二个是销毁对象,实现代码如下:
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
/**
* SyncConnectionFactory
*
* @author Laughing
* @date 2017/11/29 13:26
* Description:
*/
public class SyncConnectionFactory implements PooledObjectFactory<TProtocol> {
/**
* 服务端IP.
*/
private String serverIP;
/**
* 服务端端口.
*/
private int serverPort;
/**
* 客户端超时时间.
*/
private int timeOut;
public SyncConnectionFactory(String serverIP, int serverPort, int timeOut) {
this.serverIP = serverIP;
this.serverPort = serverPort;
this.timeOut = timeOut;
}
public PooledObject<TProtocol> makeObject() throws Exception {
TSocket socket = new TSocket(serverIP, serverPort, timeOut);
TTransport transport = new TFramedTransport(socket);
TProtocol protocol = new TBinaryProtocol(transport);
protocol.getTransport().open();
return new DefaultPooledObject<>(protocol);
}
public void destroyObject(PooledObject<TProtocol> pooledObject) throws Exception {
TProtocol protocol = pooledObject.getObject();
// 销毁时判断是否close
if (protocol.getTransport().isOpen())
protocol.getTransport().close();
}
public boolean validateObject(PooledObject<TProtocol> pooledObject) {
//判断链接是否open
TProtocol protocol = pooledObject.getObject();
return protocol.getTransport().isOpen();
}
public void activateObject(PooledObject<TProtocol> pooledObject) throws Exception {
}
public void passivateObject(PooledObject<TProtocol> pooledObject) throws Exception {
}
}
另外该接口中还可以对池中的对象进行校验的实现,最后两个空实现的方法分别是激活和钝化对象。
这里还需要说明的是,上面的代码池化的是thrift最终可使用的 protocol
协议,网上的很多实现是池化的 TSocket
,当然并不是不可以,甚至池化TSocket或许可以更好的控制网络连接,但对于日常使用来讲,在使用端应该使用尽量少的代码来使用Thrift的链接,所以这里池化TProtocol。
创建连接池和Provider
只是实现上面的池化工厂其实还没有结束,下面我们通过上面的 fatory
来创建连接池,代码如下:
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.protocol.TProtocol;
/**
* ConnProvideImpl
*
* @author Laughing
* @date 2017/11/29 13:53
* Description:
*/
public class ConnProvideImpl implements ConnectionProvider {
/**
* 服务端IP.
*/
private String serverIP;
/**
* 服务端端口.
*/
private int serverPort;
/**
* 客户端超时时间.
*/
private int timeout;
/**
* 连接池.
*/
private static GenericObjectPool<TProtocol> connPool;
SyncConnectionFactory factory;
public ConnProvideImpl(String serverIP, int serverPort, int timeout) {
this.serverIP = serverIP;
this.serverPort = serverPort;
this.timeout = timeout;
// 创建工厂和连接池
factory = new SyncConnectionFactory(serverIP, serverPort, timeout);
connPool = new GenericObjectPool<>(factory);
}
/**
* 获取链接.
*
* @return
*/
public TProtocol getConnection() {
TProtocol protocol = null;
try {
protocol = connPool.borrowObject();
if (!protocol.getTransport().peek()) {
factory = new SyncConnectionFactory(serverIP, serverPort, timeout);
connPool = new GenericObjectPool<>(factory);
}
} catch (Exception e) {
// 异常时重新初始化连接池,以保证链接的状态为最新
e.printStackTrace();
factory = new SyncConnectionFactory(serverIP, serverPort, timeout);
connPool = new GenericObjectPool<>(factory);
}
return protocol;
}
/**
* 归还链接.
*
* @param protocol
*/
public void returnConn(TProtocol protocol) {
try {
// 如果transport为打开状态则正常归还
if (protocol.getTransport().isOpen())
connPool.returnObject(protocol);
} catch (RuntimeException re) {
re.printStackTrace();
}
}
}
这里声明了一个静态的 GenericObjectPool
用于存放客户端链接对象,并且实现了 ConnectionProvider
获取和归还两个方法。
import org.apache.thrift.protocol.TProtocol;
public interface ConnectionProvider {
/**
* 获取链接.
*
* @return
*/
TProtocol getConnection();
/**
* 归还链接.
*
* @param protocol
*/
void returnConn(TProtocol protocol);
}
连接池的使用
连接池的使用方法有很多种,如果你也跟我一样是Spring Boot的死忠粉那么你可以注入一个 ConnProvideImpl
对象;或者通过带参的单例模式放到普通的Java程序中使用,先来说Spring Boot的使用,很简单:
Spring Boot
@Configuration
public class ThriftConnProvider {
@Bean
public ConnProvideImpl getConnProvider(){
return new ConnProvideImpl("10.105.45.59",9900,1000);
}
}
Singleton
public class ThriftConnSingleton extends ConnProvideImpl {
private static String HOST;
private static int PORT;
private static int TIME_OUT;
public ThriftConnSingleton() {
super(HOST, PORT, TIME_OUT);
}
private static class ConnHandler {
private final static ThriftConnSingleton instance = new ThriftConnSingleton();
}
public static ConnProvideImpl getInstance(String IP, int port, int timeOut) {
HOST = IP;
PORT = port;
TIME_OUT = timeOut;
return ConnHandler.instance;
}
}
// Singleton 连接池使用
private static ConnProvideImpl provider = ThriftConnSingleton.getInstance("10.105.45.59", 9900, 1000);
Thrift链接使用的坑
现在假设在使用中Thrift的server端宕机,导致客户端不能正常的链接,这时会抛出 BrokenPipe
、 Connectionreset
等异常,但server端可用后客户端依然不能正常的链接服务端,代码如下:
TProtocol protocol = provider.getConnection();
try {
HistoryService.Client client = new HistoryService.Client(protocol);
HistoryNameMsg resp = client.Get(name);
result = resp.toString();
} catch (TException e) {
e.printStackTrace();
} finally {
provid.returnConn(protocol);
}
以上代码没有办法重连到已经重启好的服务,这时要主动关闭transport的链接,再从连接池中重新获取链接对象就可以了。代码如下:
TProtocol protocol = provider.getConnection();
try {
HistoryService.Client client = new HistoryService.Client(protocol);
HistoryNameMsg resp = client.Get(name);
result = resp.toString();
} catch (TException e) {
e.printStackTrace();
// 在发生异常时主动关闭链接,如果服务可用下次会恢复链接正常
protocol.getTransport().close();
} finally {
provid.returnConn(protocol);
}
如果是这样的情况下,如果异常后进行close,然后再打开是不是也可以解决重连的问题?稍后会继续试验。
以上是关于Thrift连接池的实现(Java)的主要内容,如果未能解决你的问题,请参考以下文章
环境初始化 Build and Install the Apache Thrift IDL Compiler Install the Platform Development Tools(代码片段