Thrift连接池的实现(Java)

Posted LaughingVzr

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Thrift连接池的实现(Java)相关的知识,希望对你有一定的参考价值。

在众多RPC的框架中,Thrift可以说以使用简洁、高性能著称,但对于同步的Client的实际使用还有一些问题,比如下文要说到的连接池问题。在官方的Demo中并没有池化的实现,这就需要我们动手来实现,下文中使用的是Apache Common Pool来做的实现。

实现PooledObjectFactory接口

主要实现该接口中的 makeObjectdestoryObject方法,第一个是创建对象,第二个是销毁对象,实现代码如下:

 
   
   
 
  1. import org.apache.commons.pool2.PooledObject;

  2. import org.apache.commons.pool2.PooledObjectFactory;

  3. import org.apache.commons.pool2.impl.DefaultPooledObject;

  4. import org.apache.thrift.protocol.TBinaryProtocol;

  5. import org.apache.thrift.protocol.TProtocol;

  6. import org.apache.thrift.transport.TFramedTransport;

  7. import org.apache.thrift.transport.TSocket;

  8. import org.apache.thrift.transport.TTransport;

  9. /**

  10. * SyncConnectionFactory

  11. *

  12. * @author Laughing

  13. * @date 2017/11/29 13:26

  14. * Description:

  15. */

  16. public class SyncConnectionFactory implements PooledObjectFactory<TProtocol> {

  17.    /**

  18.     * 服务端IP.

  19.     */

  20.    private String serverIP;

  21.    /**

  22.     * 服务端端口.

  23.     */

  24.    private int serverPort;

  25.    /**

  26.     * 客户端超时时间.

  27.     */

  28.    private int timeOut;

  29.    public SyncConnectionFactory(String serverIP, int serverPort, int timeOut) {

  30.        this.serverIP = serverIP;

  31.        this.serverPort = serverPort;

  32.        this.timeOut = timeOut;

  33.    }

  34.    public PooledObject<TProtocol> makeObject() throws Exception {

  35.        TSocket socket = new TSocket(serverIP, serverPort, timeOut);

  36.        TTransport transport = new TFramedTransport(socket);

  37.        TProtocol protocol = new TBinaryProtocol(transport);

  38.        protocol.getTransport().open();

  39.        return new DefaultPooledObject<>(protocol);

  40.    }

  41.    public void destroyObject(PooledObject<TProtocol> pooledObject) throws Exception {

  42.        TProtocol protocol = pooledObject.getObject();

  43.        // 销毁时判断是否close

  44.        if (protocol.getTransport().isOpen())

  45.            protocol.getTransport().close();

  46.    }

  47.    public boolean validateObject(PooledObject<TProtocol> pooledObject) {

  48.        //判断链接是否open

  49.        TProtocol protocol = pooledObject.getObject();

  50.        return protocol.getTransport().isOpen();

  51.    }

  52.    public void activateObject(PooledObject<TProtocol> pooledObject) throws Exception {

  53.    }

  54.    public void passivateObject(PooledObject<TProtocol> pooledObject) throws Exception {

  55.    }

  56. }

另外该接口中还可以对池中的对象进行校验的实现,最后两个空实现的方法分别是激活和钝化对象。

这里还需要说明的是,上面的代码池化的是thrift最终可使用的 protocol协议,网上的很多实现是池化的 TSocket,当然并不是不可以,甚至池化TSocket或许可以更好的控制网络连接,但对于日常使用来讲,在使用端应该使用尽量少的代码来使用Thrift的链接,所以这里池化TProtocol。

创建连接池和Provider

只是实现上面的池化工厂其实还没有结束,下面我们通过上面的 fatory来创建连接池,代码如下:

 
   
   
 
  1. import org.apache.commons.pool2.impl.GenericObjectPool;

  2. import org.apache.thrift.protocol.TProtocol;

  3. /**

  4. * ConnProvideImpl

  5. *

  6. * @author Laughing

  7. * @date 2017/11/29 13:53

  8. * Description:

  9. */

  10. public class ConnProvideImpl implements ConnectionProvider {

  11.    /**

  12.     * 服务端IP.

  13.     */

  14.    private String serverIP;

  15.    /**

  16.     * 服务端端口.

  17.     */

  18.    private int serverPort;

  19.    /**

  20.     * 客户端超时时间.

  21.     */

  22.    private int timeout;

  23.    /**

  24.     * 连接池.

  25.     */

  26.    private static GenericObjectPool<TProtocol> connPool;

  27.    SyncConnectionFactory factory;

  28.    public ConnProvideImpl(String serverIP, int serverPort, int timeout) {

  29.        this.serverIP = serverIP;

  30.        this.serverPort = serverPort;

  31.        this.timeout = timeout;

  32.          // 创建工厂和连接池

  33.        factory = new SyncConnectionFactory(serverIP, serverPort, timeout);

  34.        connPool = new GenericObjectPool<>(factory);

  35.    }

  36.    /**

  37.     * 获取链接.

  38.     *

  39.     * @return

  40.     */

  41.    public TProtocol getConnection() {

  42.        TProtocol protocol = null;

  43.        try {

  44.            protocol = connPool.borrowObject();

  45.            if (!protocol.getTransport().peek()) {

  46.                factory = new SyncConnectionFactory(serverIP, serverPort, timeout);

  47.                connPool = new GenericObjectPool<>(factory);

  48.            }

  49.        } catch (Exception e) {

  50.              // 异常时重新初始化连接池,以保证链接的状态为最新

  51.            e.printStackTrace();

  52.            factory = new SyncConnectionFactory(serverIP, serverPort, timeout);

  53.            connPool = new GenericObjectPool<>(factory);

  54.        }

  55.        return protocol;

  56.    }

  57.    /**

  58.     * 归还链接.

  59.     *

  60.     * @param protocol

  61.     */

  62.    public void returnConn(TProtocol protocol) {

  63.        try {

  64.              // 如果transport为打开状态则正常归还

  65.            if (protocol.getTransport().isOpen())

  66.                connPool.returnObject(protocol);

  67.        } catch (RuntimeException re) {

  68.            re.printStackTrace();

  69.        }

  70.    }

  71. }

这里声明了一个静态的 GenericObjectPool用于存放客户端链接对象,并且实现了 ConnectionProvider获取和归还两个方法。

 
   
   
 
  1. import org.apache.thrift.protocol.TProtocol;

  2. public interface ConnectionProvider {

  3.    /**

  4.     * 获取链接.

  5.     *

  6.     * @return

  7.     */

  8.    TProtocol getConnection();

  9.    /**

  10.     * 归还链接.

  11.     *

  12.     * @param protocol

  13.     */

  14.    void returnConn(TProtocol protocol);

  15. }

连接池的使用

连接池的使用方法有很多种,如果你也跟我一样是Spring Boot的死忠粉那么你可以注入一个 ConnProvideImpl对象;或者通过带参的单例模式放到普通的Java程序中使用,先来说Spring Boot的使用,很简单:

Spring Boot

 
   
   
 
  1. @Configuration

  2. public class ThriftConnProvider {

  3.    @Bean

  4.    public ConnProvideImpl getConnProvider(){

  5.        return new ConnProvideImpl("10.105.45.59",9900,1000);

  6.    }

  7. }

Singleton

 
   
   
 
  1. public class ThriftConnSingleton extends ConnProvideImpl {

  2.    private static String HOST;

  3.    private static int PORT;

  4.    private static int TIME_OUT;

  5.    public ThriftConnSingleton() {

  6.        super(HOST, PORT, TIME_OUT);

  7.    }

  8.    private static class ConnHandler {

  9.        private final static ThriftConnSingleton instance = new ThriftConnSingleton();

  10.    }

  11.    public static ConnProvideImpl getInstance(String IP, int port, int timeOut) {

  12.        HOST = IP;

  13.        PORT = port;

  14.        TIME_OUT = timeOut;

  15.        return ConnHandler.instance;

  16.    }

  17. }

  18. // Singleton 连接池使用

  19. private static ConnProvideImpl provider = ThriftConnSingleton.getInstance("10.105.45.59", 9900, 1000);

Thrift链接使用的坑

现在假设在使用中Thrift的server端宕机,导致客户端不能正常的链接,这时会抛出 BrokenPipeConnectionreset等异常,但server端可用后客户端依然不能正常的链接服务端,代码如下:

 
   
   
 
  1. TProtocol protocol = provider.getConnection();

  2.    try {

  3.        HistoryService.Client client = new HistoryService.Client(protocol);

  4.        HistoryNameMsg resp = client.Get(name);

  5.        result = resp.toString();

  6.    } catch (TException e) {

  7.        e.printStackTrace();

  8.    } finally {

  9.        provid.returnConn(protocol);

  10.    }

以上代码没有办法重连到已经重启好的服务,这时要主动关闭transport的链接,再从连接池中重新获取链接对象就可以了。代码如下:

 
   
   
 
  1. TProtocol protocol = provider.getConnection();

  2.    try {

  3.        HistoryService.Client client = new HistoryService.Client(protocol);

  4.        HistoryNameMsg resp = client.Get(name);

  5.        result = resp.toString();

  6.    } catch (TException e) {

  7.        e.printStackTrace();

  8.          // 在发生异常时主动关闭链接,如果服务可用下次会恢复链接正常

  9.        protocol.getTransport().close();

  10.    } finally {

  11.        provid.returnConn(protocol);

  12.    }

如果是这样的情况下,如果异常后进行close,然后再打开是不是也可以解决重连的问题?稍后会继续试验。


以上是关于Thrift连接池的实现(Java)的主要内容,如果未能解决你的问题,请参考以下文章

数据库连接池的Java连接池

数据库连接池的选择 | 实现

环境初始化 Build and Install the Apache Thrift IDL Compiler Install the Platform Development Tools(代码片段

基于thrift的java和python分别作为客户端和服务端的调用实现

搞软件开发,请你来谈谈数据库连接池的原理吧

大数据必学Java基础(一百零二):连接池的使用