对于commons pool就不作介绍了,文章分两部分:源码分析,例子。
源码分析
在Commons pool之中主要有几个概念:
-
ObjectPool:提供所有对象的存取管理。
-
PooledObject:池化的对象,是对对象的一个包装,加上了对象的一些其他信息,包括对象的状态(已用、空闲),对象的创建时间等。
-
PooledObjectFactory:工厂类,负责池化对象的创建,对象的初始化,对象状态的销毁和对象状态的验证。
先看下ObjectPool的继承结构:
缺图!!
主要使用的就是GenericObjectPool,对于SoftReferenceObjectPool是给池内对象用SoftReference包裹,允许垃圾回收器在有内存需要的时候删除这部分对象。知道这点就行了,至于为啥软连接对象可以被回收,就是另外的话题了。所以本文也就略过SoftReference部分。
PooledObject
继承结构
都挺清晰的,直接看代码吧
PooledObject
PooledObject没有什么太重要的信息,主要就是接口定义,直接看下接口结构就行了
缺图!!
DefaultPooledObject
DefaultPooledObject是实现类,主要的逻辑都在DefaultPooledObject,关键的是属性
/**
* This wrapper is used to track the additional information, such as state, for
* the pooled objects.
* <p>
* This class is intended to be thread-safe.
* </p>
*
* @param <T> the type of object in the pool
*
* @since 2.0
*/
public class DefaultPooledObject<T> implements PooledObject<T> {
private final T object;
//状态及统计信息
private PooledObjectState state = PooledObjectState.IDLE;
private final long createTime = System.currentTimeMillis();
private volatile long lastBorrowTime = createTime;
private volatile long lastUseTime = createTime;
private volatile long lastReturnTime = createTime;
private volatile boolean logAbandoned = false;
private volatile CallStack borrowedBy = NoOpCallStack.INSTANCE;
private volatile CallStack usedBy = NoOpCallStack.INSTANCE;
private volatile long borrowedCount = 0;
public DefaultPooledObject(final T object) {
this.object = object;
}
//都是些状态和统计的get&set,省略了
......
/**
* Allocates the object.
*
* @return {@code true} if the original state was {@link PooledObjectState#IDLE IDLE}
*/
@Override
public synchronized boolean allocate() {
if (state == PooledObjectState.IDLE) {
state = PooledObjectState.ALLOCATED;
lastBorrowTime = System.currentTimeMillis();
lastUseTime = lastBorrowTime;
borrowedCount++;
if (logAbandoned) {
borrowedBy.fillInStackTrace();
}
return true;
} else if (state == PooledObjectState.EVICTION) {
// TODO Allocate anyway and ignore eviction test
state = PooledObjectState.EVICTION_RETURN_TO_HEAD;
return false;
}
// TODO if validating and testOnBorrow == true then pre-allocate for
// performance
return false;
}
/**
* Deallocates the object and sets it {@link PooledObjectState#IDLE IDLE}
* if it is currently {@link PooledObjectState#ALLOCATED ALLOCATED}.
*
* @return {@code true} if the state was {@link PooledObjectState#ALLOCATED ALLOCATED}
*/
@Override
public synchronized boolean deallocate() {
if (state == PooledObjectState.ALLOCATED ||
state == PooledObjectState.RETURNING) {
state = PooledObjectState.IDLE;
lastReturnTime = System.currentTimeMillis();
borrowedBy.clear();
return true;
}
return false;
}
......
}
PooledObjectFactory
Commons Pool之中有很多内部Factory继承自PooledObjectFactory,不过都是些基础的定制,没有太大的意义,PooledObjectFactory主要还是一种设计和接口定义,具体的实现交给用户。所以我们只分析基本BasePooledObjectFactory。
PooledObjectFactory的继承结构如下:
看下PooledObjectFactory接口:
public interface PooledObjectFactory<T> {
/**
* Creates an instance that can be served by the pool and wrap it in a
* PooledObject to be managed by the pool.
*/
PooledObject<T> makeObject() throws Exception;
/**
* Destroys an instance no longer needed by the pool.
* 如果创建对象出现异常,或者创建的对象不可用,需要将对象进行销毁
*/
void destroyObject(PooledObject<T> p) throws Exception;
/**
* Ensures that the instance is safe to be returned by the pool.
* 确认对象可用
*/
boolean validateObject(PooledObject<T> p);
/**
* Reinitializes an instance to be returned by the pool.
* 重新初始化需要由池返回的实例, 也就是每次返回的时候都做一次重新初始化
*/
void activateObject(PooledObject<T> p) throws Exception;
/**
* Uninitializes an instance to be returned to the idle object pool.
* 取消需要返回的实例的初始化
* 举个例子: StringBuffer的对象池中,取出对象的时候讲StringBuffer原先的缓存清空
*/
void passivateObject(PooledObject<T> p) throws Exception;
}
提供了创建对象的接口:makeObject(), 以及对象入池出池相关管理的接口。这些接口都是会在相应的逻辑之中进行调用,接口逻辑由用户提供。
// 是一个抽象类,所有的用户自定义Factory都继承自BasePooledObjectFactory
public abstract class BasePooledObjectFactory<T> extends BaseObject implements PooledObjectFactory<T> {
public abstract T create() throws Exception;
/**
* Wrap the provided instance with an implementation of PooledObject
* 对实际存储的对象T,使用PooledObject进行包装,以便于ObjectPool管理
*/
public abstract PooledObject<T> wrap(T obj);
@Override
public PooledObject<T> makeObject() throws Exception {
return wrap(create());
}
@Override
public void destroyObject(final PooledObject<T> p) throws Exception {
}
@Override
public boolean validateObject(final PooledObject<T> p) {
return true;
}
@Override
public void activateObject(final PooledObject<T> p) throws Exception {
}
@Override
public void passivateObject(final PooledObject<T> p) throws Exception {
}
}
BasePooledObjectFactory所做的工作就是包装内部池化对象,并且影藏了makeObject接口,并对外提供create()创建池化对象,wrap()用于包装对象。create() & wrap()是BasePooledObjectFactory派生类必须要实现的两个接口。后面的例子中就能看到。
Object Pool
ObjectPool的继承结构如下:
缺图!!
OK先看下ObjectPool的接口定义
从方法名称可以看出, 主要是添加、借出、归还接口。看下接口注释就可以理解更加清晰:
public interface ObjectPool<T> extends Closeable {
/**
* Creates an object using the PooledObjectFactory or other
* implementation dependent mechanism, passivate it, and then place it in
* the idle object pool. addObject is useful for "pre-loading"
* a pool with idle objects. (Optional operation).
*
* @throws Exception
* when {@link PooledObjectFactory#makeObject} fails.
* @throws IllegalStateException
* after {@link #close} has been called on this pool.
* @throws UnsupportedOperationException
* when this pool cannot add new idle objects.
*/
// 为什么addObject()没有参数? 因为是工厂模式,创建委托给factory来做
void addObject() throws Exception, IllegalStateException,
UnsupportedOperationException;
default void addObjects(final int count) throws Exception {
for (int i = 0; i < count; i++) {
addObject();
}
}
/**
* Obtains an instance from this pool.
* <p>
* Instances returned from this method will have been either newly created
* with {@link PooledObjectFactory#makeObject} or will be a previously
* idle object and have been activated with
* {@link PooledObjectFactory#activateObject} and then validated with
* {@link PooledObjectFactory#validateObject}.
* </p>
* <p>
* By contract, clients <strong>must</strong> return the borrowed instance
* using {@link #returnObject}, {@link #invalidateObject}, or a related
* method as defined in an implementation or sub-interface.
* </p>
* <p>
* The behaviour of this method when the pool has been exhausted
* is not strictly specified (although it may be specified by
* implementations).
* </p>
*
* @return an instance from this pool.
*
* @throws IllegalStateException
* after {@link #close close} has been called on this pool.
* @throws Exception
* when {@link PooledObjectFactory#makeObject} throws an
* exception.
* @throws NoSuchElementException
* when the pool is exhausted and cannot or will not return
* another instance.
*/
T borrowObject() throws Exception, NoSuchElementException,
IllegalStateException;
/**
* Clears any objects sitting idle in the pool, releasing any associated
* resources (optional operation). Idle objects cleared must be
* {@link PooledObjectFactory#destroyObject(PooledObject)}.
*
* @throws UnsupportedOperationException
* if this implementation does not support the operation
*
* @throws Exception if the pool cannot be cleared
*/
void clear() throws Exception, UnsupportedOperationException;
/**
* Closes this pool, and free any resources associated with it.
*/
@Override
void close();
//需要关注下Active和Idle的数量是如何维护的
int getNumActive();
int getNumIdle();
void invalidateObject(T obj) throws Exception;
void returnObject(T obj) throws Exception;
}
BaseObjectPool
BaseObjectPool继承了BaseObject:
public abstract class BaseObjectPool<T> extends BaseObject implements ObjectPool<T>
另外BaseObjectPool增加了一个属性closed:
private volatile boolean closed = false;
GenericObjectPool
关键的类,是Pool的实现类,内部有Factory成员,以及队列:
缺图!!
先看下GenericObjectPool的相关属性以及构造方法
public class GenericObjectPool<T> extends BaseGenericObjectPool<T>
implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {
// --- configuration attributes --------------------------------------------
private volatile int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private volatile int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
//池化工厂
private final PooledObjectFactory<T> factory;
// --- internal attributes -------------------------------------------------
/*
* All of the objects currently associated with this pool in any state. It
* excludes objects that have been destroyed. The size of
* {@link #allObjects} will always be less than or equal to {@link
* #_maxActive}. Map keys are pooled objects, values are the PooledObject
* wrappers used internally by the pool.
*/
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
new ConcurrentHashMap<>();
/*
* The combined count of the currently created objects and those in the
* process of being created. Under load, it may exceed {@link #_maxActive}
* if multiple threads try and create a new object at the same time but
* {@link #create()} will ensure that there are never more than
* {@link #_maxActive} objects created at any one time.
*/
private final AtomicLong createCount = new AtomicLong(0);
private long makeObjectCount = 0;
private final Object makeObjectCountLock = new Object();
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
// JMX specific attributes
private static final String ONAME_BASE =
"org.apache.commons.pool2:type=GenericObjectPool,name=";
// Additional configuration properties for abandoned object tracking
private volatile AbandonedConfig abandonedConfig = null;
/**
* Creates a new GenericObjectPool using defaults from GenericObjectPoolConfig
*/
public GenericObjectPool(final PooledObjectFactory<T> factory) {
this(factory, new GenericObjectPoolConfig<T>());
}
/**
* Creates a new <code>GenericObjectPool</code> using a specific configuration.
*/
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig<T> config) {
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
idleObjects = new LinkedBlockingDeque<>(config.getFairness());
setConfig(config);
}
/**
* Creates a new <code>GenericObjectPool</code> that tracks and destroys
* objects that are checked out, but never returned to the pool.
*
* @param factory The object factory to be used to create object instances
* used by this pool
* @param config The base pool configuration to use for this pool instance.
* The configuration is used by value. Subsequent changes to
* the configuration object will not be reflected in the
* pool.
* @param abandonedConfig Configuration for abandoned object identification
* and removal. The configuration is used by value.
*/
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig<T> config, final AbandonedConfig abandonedConfig) {
this(factory, config);
setAbandonedConfig(abandonedConfig);
}
......
}
从属性可以看出来,GenericObjectPool使用了ConncurentHashMap来记录对象,使用了LinkedBlockingQueue来存储空闲对象。到这里Pool,Factory,Container的关系已经很清晰了:
GenericObjectPoolConfig
就是对象池的配置文件,池大小,最小空闲,最大空闲。
另外GenericObejctPool是有默认值的, GenericObjectPool继承了BaseGenericObjectPool:
在BaseGenericObjectPool之中设置了默认值,不过通常还是自己配置下对象池的属性更加好。
Demo
StringBuffer对象池
public class StringBufferFactory extends BasePooledObjectFactory<StringBuffer> {
@Override
public StringBuffer create() throws Exception {
return new StringBuffer();
}
@Override
public PooledObject<StringBuffer> wrap(StringBuffer stringBuffer) {
return new DefaultPooledObject<>(stringBuffer);
}
@Override
public void passivateObject(PooledObject<StringBuffer> pooledObject) throws Exception {
pooledObject.getObject().setLength(0);
}
}
public class StringbufferPoolProxy {
private GenericObjectPool<StringBuffer> pool;
public StringbufferPoolProxy(int minPoolSize, int maxPoolSize) {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(maxPoolSize);
poolConfig.setMinIdle(minPoolSize);
pool = new GenericObjectPool<StringBuffer>(new StringBufferFactory(), poolConfig);
}
public StringBuffer borrowObject() {
System.out.println("=============in borrow object=============");
System.out.println("pool active size: " + pool.getNumActive());
System.out.println("pool idle size: " + pool.getNumIdle());
try {
return pool.borrowObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public void retrurnObject(StringBuffer stringBuffer) {
if (stringBuffer == null) {
return;
}
pool.returnObject(stringBuffer);
System.out.println("=============in return object=============");
System.out.println("pool active size: " + pool.getNumActive());
System.out.println("pool idle size: " + pool.getNumIdle());
}
}
public class Main {
public static void main(String[] args) {
StringbufferPoolProxy proxy = new StringbufferPoolProxy(0, 1000);
for(int i = 0; i < 10; i++) {
StringBuffer stringBuffer = proxy.borrowObject();
stringBuffer.append("hello ");
System.out.println(stringBuffer.toString());
proxy.retrurnObject(stringBuffer);
}
}
}
mysql连接池
public class MySqlConnectionFactory extends BasePooledObjectFactory<Connection> {
static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost:3306/employees?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";
static final String USER = "root";
static final String PASS = "123456";
static {
try {
Class.forName(JDBC_DRIVER);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override
public Connection create() throws Exception {
return DriverManager.getConnection(DB_URL,USER,PASS);
}
@Override
public PooledObject<Connection> wrap(Connection connection) {
return new DefaultPooledObject<>(connection);
}
}
public class MysqlConnectionPoolProxy {
private GenericObjectPool<Connection> pool;
public MysqlConnectionPoolProxy(int minPoolSize, int maxPoolSize) {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(maxPoolSize);
poolConfig.setMinIdle(minPoolSize);
pool = new GenericObjectPool<Connection>(new MySqlConnectionFactory(), poolConfig);
}
public Connection borrowObject() {
try {
return pool.borrowObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public void returnObejct(Connection connection) {
if (connection != null) {
pool.returnObject(connection);
}
}
}
public class Main {
public static void main(String[] args) throws SQLException {
MysqlConnectionPoolProxy proxy = new MysqlConnectionPoolProxy(1, 1000);
for(int i = 1; i < 200; i++){
Connection connection = proxy.borrowObject();
Statement statement = connection.createStatement();
String sql = "select count(*) from employees;";
ResultSet rs = statement.executeQuery(sql);
while(rs.next()) {
System.out.println(rs.getInt(1));
}
//如果不return的话这些链接是一直链接着的,浪费链接及内存泄露。
proxy.returnObejct(connection);
}
}
}