Apache Commons Pool

Posted 江舟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Commons Pool相关的知识,希望对你有一定的参考价值。

对于commons pool就不作介绍了,文章分两部分:源码分析,例子。

源码分析

在Commons pool之中主要有几个概念:

  • ObjectPool:提供所有对象的存取管理。

  • PooledObject:池化的对象,是对对象的一个包装,加上了对象的一些其他信息,包括对象的状态(已用、空闲),对象的创建时间等。

  • PooledObjectFactory:工厂类,负责池化对象的创建,对象的初始化,对象状态的销毁和对象状态的验证。

先看下ObjectPool的继承结构:

缺图!!

主要使用的就是GenericObjectPool,对于SoftReferenceObjectPool是给池内对象用SoftReference包裹,允许垃圾回收器在有内存需要的时候删除这部分对象。知道这点就行了,至于为啥软连接对象可以被回收,就是另外的话题了。所以本文也就略过SoftReference部分。

PooledObject

继承结构

都挺清晰的,直接看代码吧

PooledObject

PooledObject没有什么太重要的信息,主要就是接口定义,直接看下接口结构就行了

缺图!!

DefaultPooledObject

DefaultPooledObject是实现类,主要的逻辑都在DefaultPooledObject,关键的是属性

/**
 * This wrapper is used to track the additional information, such as state, for
 * the pooled objects.
 * <p>
 * This class is intended to be thread-safe.
 * </p>
 *
 * @param <T> the type of object in the pool
 *
 * @since 2.0
 */
public class DefaultPooledObject<T> implements PooledObject<T> {

    private final T object;
    //状态及统计信息
    private PooledObjectState state = PooledObjectState.IDLE; 
    private final long createTime = System.currentTimeMillis();
    private volatile long lastBorrowTime = createTime;
    private volatile long lastUseTime = createTime;
    private volatile long lastReturnTime = createTime;
    private volatile boolean logAbandoned = false;
    private volatile CallStack borrowedBy = NoOpCallStack.INSTANCE;
    private volatile CallStack usedBy = NoOpCallStack.INSTANCE;
    private volatile long borrowedCount = 0;

    public DefaultPooledObject(final T object) {
        this.object = object;
    }
  
  	//都是些状态和统计的get&set,省略了
    ......
    
    /**
     * Allocates the object.
     *
     * @return {@code true} if the original state was {@link PooledObjectState#IDLE IDLE}
     */
    @Override
    public synchronized boolean allocate() {
        if (state == PooledObjectState.IDLE) {
            state = PooledObjectState.ALLOCATED;
            lastBorrowTime = System.currentTimeMillis();
            lastUseTime = lastBorrowTime;
            borrowedCount++;
            if (logAbandoned) {
                borrowedBy.fillInStackTrace();
            }
            return true;
        } else if (state == PooledObjectState.EVICTION) {
            // TODO Allocate anyway and ignore eviction test
            state = PooledObjectState.EVICTION_RETURN_TO_HEAD;
            return false;
        }
        // TODO if validating and testOnBorrow == true then pre-allocate for
        // performance
        return false;
    }

    /**
     * Deallocates the object and sets it {@link PooledObjectState#IDLE IDLE}
     * if it is currently {@link PooledObjectState#ALLOCATED ALLOCATED}.
     *
     * @return {@code true} if the state was {@link PooledObjectState#ALLOCATED ALLOCATED}
     */
    @Override
    public synchronized boolean deallocate() {
        if (state == PooledObjectState.ALLOCATED ||
                state == PooledObjectState.RETURNING) {
            state = PooledObjectState.IDLE;
            lastReturnTime = System.currentTimeMillis();
            borrowedBy.clear();
            return true;
        }

        return false;
    }
     
		......
      
}  	

PooledObjectFactory

Commons Pool之中有很多内部Factory继承自PooledObjectFactory,不过都是些基础的定制,没有太大的意义,PooledObjectFactory主要还是一种设计和接口定义,具体的实现交给用户。所以我们只分析基本BasePooledObjectFactory。

PooledObjectFactory的继承结构如下:

看下PooledObjectFactory接口:

public interface PooledObjectFactory<T> {

  /**
   * Creates an instance that can be served by the pool and wrap it in a
   * PooledObject to be managed by the pool.
   */
  PooledObject<T> makeObject() throws Exception;

  /**
   * Destroys an instance no longer needed by the pool.
   * 如果创建对象出现异常,或者创建的对象不可用,需要将对象进行销毁
   */
  void destroyObject(PooledObject<T> p) throws Exception;

  /**
   * Ensures that the instance is safe to be returned by the pool.
   * 确认对象可用
   */
  boolean validateObject(PooledObject<T> p);

  /**
   * Reinitializes an instance to be returned by the pool.
   * 重新初始化需要由池返回的实例, 也就是每次返回的时候都做一次重新初始化
   */
  void activateObject(PooledObject<T> p) throws Exception;

  /**
   * Uninitializes an instance to be returned to the idle object pool.
   * 取消需要返回的实例的初始化
   * 举个例子: StringBuffer的对象池中,取出对象的时候讲StringBuffer原先的缓存清空
   */
  void passivateObject(PooledObject<T> p) throws Exception;
}

提供了创建对象的接口:makeObject(), 以及对象入池出池相关管理的接口。这些接口都是会在相应的逻辑之中进行调用,接口逻辑由用户提供。

// 是一个抽象类,所有的用户自定义Factory都继承自BasePooledObjectFactory
public abstract class BasePooledObjectFactory<T> extends BaseObject implements PooledObjectFactory<T> {
    public abstract T create() throws Exception;

    /**
     * Wrap the provided instance with an implementation of PooledObject
     * 对实际存储的对象T,使用PooledObject进行包装,以便于ObjectPool管理
     */
    public abstract PooledObject<T> wrap(T obj);

    @Override
    public PooledObject<T> makeObject() throws Exception {
        return wrap(create());
    }

    @Override
    public void destroyObject(final PooledObject<T> p) throws Exception  {
    }

    @Override
    public boolean validateObject(final PooledObject<T> p) {
        return true;
    }

    @Override
    public void activateObject(final PooledObject<T> p) throws Exception {
    }

    @Override
    public void passivateObject(final PooledObject<T> p) throws Exception {
    }
}

BasePooledObjectFactory所做的工作就是包装内部池化对象,并且影藏了makeObject接口,并对外提供create()创建池化对象,wrap()用于包装对象。create() & wrap()是BasePooledObjectFactory派生类必须要实现的两个接口。后面的例子中就能看到。

Object Pool

ObjectPool的继承结构如下:

缺图!!

OK先看下ObjectPool的接口定义

从方法名称可以看出, 主要是添加、借出、归还接口。看下接口注释就可以理解更加清晰:

public interface ObjectPool<T> extends Closeable {

    /**
     * Creates an object using the  PooledObjectFactory or other
     * implementation dependent mechanism, passivate it, and then place it in
     * the idle object pool. addObject is useful for "pre-loading"
     * a pool with idle objects. (Optional operation).
     *
     * @throws Exception
     *              when {@link PooledObjectFactory#makeObject} fails.
     * @throws IllegalStateException
     *              after {@link #close} has been called on this pool.
     * @throws UnsupportedOperationException
     *              when this pool cannot add new idle objects.
     */
		// 为什么addObject()没有参数? 因为是工厂模式,创建委托给factory来做
    void addObject() throws Exception, IllegalStateException,
            UnsupportedOperationException;

    default void addObjects(final int count) throws Exception {
        for (int i = 0; i < count; i++) {
            addObject();
        }
    }

    /**
     * Obtains an instance from this pool.
     * <p>
     * Instances returned from this method will have been either newly created
     * with {@link PooledObjectFactory#makeObject} or will be a previously
     * idle object and have been activated with
     * {@link PooledObjectFactory#activateObject} and then validated with
     * {@link PooledObjectFactory#validateObject}.
     * </p>
     * <p>
     * By contract, clients <strong>must</strong> return the borrowed instance
     * using {@link #returnObject}, {@link #invalidateObject}, or a related
     * method as defined in an implementation or sub-interface.
     * </p>
     * <p>
     * The behaviour of this method when the pool has been exhausted
     * is not strictly specified (although it may be specified by
     * implementations).
     * </p>
     *
     * @return an instance from this pool.
     *
     * @throws IllegalStateException
     *              after {@link #close close} has been called on this pool.
     * @throws Exception
     *              when {@link PooledObjectFactory#makeObject} throws an
     *              exception.
     * @throws NoSuchElementException
     *              when the pool is exhausted and cannot or will not return
     *              another instance.
     */
    T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;

    /**
     * Clears any objects sitting idle in the pool, releasing any associated
     * resources (optional operation). Idle objects cleared must be
     * {@link PooledObjectFactory#destroyObject(PooledObject)}.
     *
     * @throws UnsupportedOperationException
     *              if this implementation does not support the operation
     *
     * @throws Exception if the pool cannot be cleared
     */
    void clear() throws Exception, UnsupportedOperationException;

    /**
     * Closes this pool, and free any resources associated with it.
     */
    @Override
    void close();

	  //需要关注下Active和Idle的数量是如何维护的
    int getNumActive();

    int getNumIdle();

    void invalidateObject(T obj) throws Exception;

    void returnObject(T obj) throws Exception;
}

BaseObjectPool

BaseObjectPool继承了BaseObject:

public abstract class BaseObjectPool<T> extends BaseObject implements ObjectPool<T>

另外BaseObjectPool增加了一个属性closed:

private volatile boolean closed = false;

GenericObjectPool

关键的类,是Pool的实现类,内部有Factory成员,以及队列:

缺图!!

先看下GenericObjectPool的相关属性以及构造方法

public class GenericObjectPool<T> extends BaseGenericObjectPool<T>
        implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
	  
  
    // --- configuration attributes --------------------------------------------
    private volatile int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
    private volatile int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
    //池化工厂
    private final PooledObjectFactory<T> factory;


    // --- internal attributes -------------------------------------------------

    /*
     * All of the objects currently associated with this pool in any state. It
     * excludes objects that have been destroyed. The size of
     * {@link #allObjects} will always be less than or equal to {@link
     * #_maxActive}. Map keys are pooled objects, values are the PooledObject
     * wrappers used internally by the pool.
     */
    private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
        new ConcurrentHashMap<>();
    /*
     * The combined count of the currently created objects and those in the
     * process of being created. Under load, it may exceed {@link #_maxActive}
     * if multiple threads try and create a new object at the same time but
     * {@link #create()} will ensure that there are never more than
     * {@link #_maxActive} objects created at any one time.
     */
    private final AtomicLong createCount = new AtomicLong(0);
    private long makeObjectCount = 0;
    private final Object makeObjectCountLock = new Object();
    private final LinkedBlockingDeque<PooledObject<T>> idleObjects;

    // JMX specific attributes
    private static final String ONAME_BASE =
        "org.apache.commons.pool2:type=GenericObjectPool,name=";

    // Additional configuration properties for abandoned object tracking
    private volatile AbandonedConfig abandonedConfig = null;

    /**
     * Creates a new GenericObjectPool using defaults from GenericObjectPoolConfig
     */
    public GenericObjectPool(final PooledObjectFactory<T> factory) {
        this(factory, new GenericObjectPoolConfig<T>());
    }

    /**
     * Creates a new <code>GenericObjectPool</code> using a specific configuration.
     */
    public GenericObjectPool(final PooledObjectFactory<T> factory,
            											final GenericObjectPoolConfig<T> config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;
			   	
        idleObjects = new LinkedBlockingDeque<>(config.getFairness());

        setConfig(config);
    }

    /**
     * Creates a new <code>GenericObjectPool</code> that tracks and destroys
     * objects that are checked out, but never returned to the pool.
     *
     * @param factory   The object factory to be used to create object instances
     *                  used by this pool
     * @param config    The base pool configuration to use for this pool instance.
     *                  The configuration is used by value. Subsequent changes to
     *                  the configuration object will not be reflected in the
     *                  pool.
     * @param abandonedConfig  Configuration for abandoned object identification
     *                         and removal.  The configuration is used by value.
     */
    public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig<T> config, final AbandonedConfig abandonedConfig) {
        this(factory, config);
        setAbandonedConfig(abandonedConfig);
    }    
    
		......

}

从属性可以看出来,GenericObjectPool使用了ConncurentHashMap来记录对象,使用了LinkedBlockingQueue来存储空闲对象。到这里Pool,Factory,Container的关系已经很清晰了:

GenericObjectPoolConfig

就是对象池的配置文件,池大小,最小空闲,最大空闲。

另外GenericObejctPool是有默认值的, GenericObjectPool继承了BaseGenericObjectPool:

在BaseGenericObjectPool之中设置了默认值,不过通常还是自己配置下对象池的属性更加好。

Demo

StringBuffer对象池

public class StringBufferFactory extends BasePooledObjectFactory<StringBuffer> {

    @Override
    public StringBuffer create() throws Exception {
        return new StringBuffer();
    }

    @Override
    public PooledObject<StringBuffer> wrap(StringBuffer stringBuffer) {
        return new DefaultPooledObject<>(stringBuffer);
    }

    @Override
    public void passivateObject(PooledObject<StringBuffer> pooledObject) throws Exception {
        pooledObject.getObject().setLength(0);
    }

}
public class StringbufferPoolProxy {

    private GenericObjectPool<StringBuffer> pool;

    public StringbufferPoolProxy(int minPoolSize, int maxPoolSize) {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(maxPoolSize);
        poolConfig.setMinIdle(minPoolSize);

        pool = new GenericObjectPool<StringBuffer>(new StringBufferFactory(), poolConfig);
    }

    public StringBuffer borrowObject() {
        System.out.println("=============in borrow object=============");
        System.out.println("pool active size: " + pool.getNumActive());
        System.out.println("pool idle size: " + pool.getNumIdle());
        try {
            return pool.borrowObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public void retrurnObject(StringBuffer stringBuffer) {
        if (stringBuffer == null) {
            return;
        }
        pool.returnObject(stringBuffer);
        System.out.println("=============in return object=============");
        System.out.println("pool active size: " + pool.getNumActive());
        System.out.println("pool idle size: " + pool.getNumIdle());
    }
}
public class Main {

    public static void main(String[] args) {
        StringbufferPoolProxy proxy = new StringbufferPoolProxy(0, 1000);
        for(int i = 0; i < 10; i++) {
            StringBuffer stringBuffer = proxy.borrowObject();
            stringBuffer.append("hello ");
            System.out.println(stringBuffer.toString());
            proxy.retrurnObject(stringBuffer);
        }

    }
}

mysql连接池

public class MySqlConnectionFactory extends BasePooledObjectFactory<Connection> {

    static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
    static final String DB_URL = "jdbc:mysql://localhost:3306/employees?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";
    static final String USER = "root";
    static final String PASS = "123456";

    static {
        try {
            Class.forName(JDBC_DRIVER);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Connection create() throws Exception {
        return DriverManager.getConnection(DB_URL,USER,PASS);
    }

    @Override
    public PooledObject<Connection> wrap(Connection connection) {
        return new DefaultPooledObject<>(connection);
    }
}
public class MysqlConnectionPoolProxy {

    private GenericObjectPool<Connection> pool;

    public MysqlConnectionPoolProxy(int minPoolSize, int maxPoolSize) {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(maxPoolSize);
        poolConfig.setMinIdle(minPoolSize);

        pool = new GenericObjectPool<Connection>(new MySqlConnectionFactory(), poolConfig);
    }

    public Connection borrowObject() {
        try {
            return pool.borrowObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public void returnObejct(Connection connection) {
        if (connection != null) {
            pool.returnObject(connection);
        }
    }
}
public class Main {

    public static void main(String[] args) throws SQLException {
        MysqlConnectionPoolProxy proxy = new MysqlConnectionPoolProxy(1, 1000);
        for(int i = 1; i < 200; i++){
            Connection connection = proxy.borrowObject();
            Statement statement = connection.createStatement();
            String sql = "select count(*) from employees;";
            ResultSet rs = statement.executeQuery(sql);
            while(rs.next()) {
                System.out.println(rs.getInt(1));
            }
            //如果不return的话这些链接是一直链接着的,浪费链接及内存泄露。
            proxy.returnObejct(connection);
        }
    }
}

以上是关于Apache Commons Pool的主要内容,如果未能解决你的问题,请参考以下文章

基于Apache-Commons-Pool2实现Grpc客户端连接池

Caused by: java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericObjectPoolConfig(示例代

使用commons.pool2实现mysql连接池

springboot自动装配redis在pool下的问题

[redis] The type org.apache.commons.pool2.impl.GenericObjectPoolConfig cannot be resolved.It is indi

基于Apache组件,分析对象池原理