commons-pool2

Posted dark_saber

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了commons-pool2相关的知识,希望对你有一定的参考价值。

  转载请注明源出处:http://www.cnblogs.com/lighten/p/7375611.html

1.前言

  本章介绍一下常用基础Jar包commons-pools2,最近使用到了thrift作为rpc服务通讯,但是没有找到其提供的连接池。百度了一下官方貌似没有提供,需要自己实现,所以根据网上的实现方法通过使用commons-pool2包来构建自己的thrift连接池服务。完成后,顺便研究了一下commons-pool2的实现(怕使用不当有坑),也就有了这篇文章。

2.commons-pool2

  既然名字中有个2也就意味着是第二个版本了,重新命名也就意味着和原版本并不兼容。这是apache提供的一个开源包,在很多地方都会使用,2015年7月之后就不再进行更新了,目前最新版本是2.4.2(最近突然又更新了目前在2.5.0版本)。其准确来说应该是一个对象池,不过是常用在于连接上而已。

  commons-pool2的类并不多,全部42个类如下:

    

  包的结构简单,类不多,而且还有大量的接口和内部类,所以实际上需要关注的类没几个,commons-pool2使用起来也就方便了。

2.1 常见配置

  上面也说了,该类是一个基础包,被很多其它jar包使用,常用于对象池的管理,所以其一些配置,在写代码的时候也会经常接触到类似的,可能是封装过的,也可能是版本1的,不过大同小异,主要是思路。了解了配置的具体作用,再结合代码就能够了解这类jar包是如何写的了。

  通用的配置都在GenericObjectPoolConfig类中:

    maxTotal:对象池中最多允许的对象数,默认8(可能超过,不过超过后使用完了就会销毁,后面源码会介绍相关机制)

    maxIdle:对象池中最多允许存在的空闲对象,默认8

    minIdle:池中最少要保留的对象数,默认0

    lifo:是否使用FIFO先进先出的模式获取对象(空闲对象都在一个队列中),默认为true使用先进先出,false是先进后出

    fairness:是否使用公平锁,默认false(公平锁是线程安全中的概念,true的含义是谁先等待获取锁,随先在锁释放的时候获取锁,如非必要,一般不使用公平锁,会影响性能)

    maxWaitMillis:从池中获取一个对象最长的等待时间,默认-1,含义是无限等,超过这个时间还未获取空闲对象,就会抛出异常。

    minEvictableIdleTimeMillis:最小的驱逐时间,单位毫秒,默认30分钟。这个用于驱逐线程,对象空闲时间超过这个时间,意味着此时系统不忙碌,会减少对象数量。

    evictorShutdownTimeoutMillis:驱逐线程关闭的超时时间,默认10秒。

    softMinEvictableIdleTimeMillis:也是最小的驱逐时间,但是会和另一个指标minIdle一同使用,满足空闲时间超过这个设置,且当前空闲数量比设置的minIdle要大,会销毁该对象。所以,通常该值设置的比minEvictableIdleTimeMillis要小。

    numTestsPerEvictionRun:驱逐线程运行每次测试的对象数量,默认3个。驱逐线程就是用来检查对象空闲状态,通过设置的对象数量等参数,保持对象的活跃度和数量,其是一个定时任务,每次不是检查所有的对象,而是抽查几个,这个就是用于抽查。

    evictionPolicyClassName:驱逐线程使用的策略类名,之前的minEvictableIdleTimeMillis和softMinEvictableIdleTimeMillis就是默认策略DefaultEvictionPolicy的实现,可以自己实现策略。

    testOnCreate:在创建对象的时候是否检测对象,默认false。后续会结合代码说明是如何检测的。

    testOnBorrow:在获取空闲对象的时候是否检测对象是否有效,默认false。这个通常会设置成true,一般希望获取一个可用有效的对象吧。

    testOnReturn:在使用完对象放回池中时是否检测对象是否仍有效,默认false。

    testWhileIdle:在空闲的时候是否检测对象是否有效,这个发生在驱逐线程执行时。

    timeBetweenEvictionRunsMillis:驱逐线程的执行周期,上面说过该线程是个定时任务。默认-1,即不开启驱逐线程,所以与之相关的参数是没有作用的。

    blockWhenExhausted:在对象池耗尽时是否阻塞,默认true。false的话超时就没有作用了。

    jmxEnabled:是否允许jmx的方式创建一个配置实例,默认true。

    jmxNamePrefix:jmx默认的前缀名,默认为pool

    jmxNameBase:jmx默认的base name,默认为null,意味着池提供一个名称。

2.2 基本实现

  上面的配置已经说明了一些内容了,此节介绍对象池的一个基础实现思路。

  首先作为一个对象池,我们需要从池中借对象,借完了要还,还要能创建对象存入池中,校验对象是否还能使用。这个就是一个对象池的基本定义了:

  

  commons-pool2还提供了一种控制细粒度更高的对象池KeyedObjectPool<K,V>。其根据关键字来维护不同的池,在某些场景十分有用,这里不对其做详细介绍,弄明白了一般的线程池,对这种池扩展也就有了思路。从接口到抽象类到具体实现类池经历了下面几个类:ObjectPool->BaseObjectPool->SoftReferenceObjectPool,这种用的比较少,看名次也知道是软引用的池,另一种是BaseGenericObjectPool->GenericObjectPool。通常我们会继承GenericObjectPool来设计自己的对象池。

  有了池之后,我们需要一个创建池对象的类,这个就是工厂类:PooledObjectFactory。其要提供一个对象的生命周期的各个操作,包括创建、销毁、校验有效性、激活和钝化对象。

  同样,其提供了一个抽象类BasePooledObjectFactory,这个就没有具体的实现类了,因为涉及到不同对象,不同对象的管理方法不同,不好抽象。我们需要做的就是继承BasePooledObjectFactory对象,实现其未实现或者空实现的方法了,即上述截图的方法。

  有了池来管理对象使用,有了工厂来管理对象的生命周期,一般而言也就够了。但是还有一个重要的环节就是将池与工厂连接起来的对象的定义,所以要进行抽象。这就是PooledObject的作用了,其定义的一系列方法,将我们的对象和池以及工厂,通过这个接口关联了起来。实现类是DefaultPooledObject,通常使用这个就够了,不需要扩展。如果业务上对对象有更细致的控制,可以继承或者直接自己实现PooledObject。

  到此也就剩下一个驱逐线程没介绍了,其维持着池的健康,或者说是活力。以上就是一个对象池的基本内容:池本身,创建对象的工厂,清洁池的驱逐线程,关联池和工厂和自己创建的对象的池对象规范。这样也就没几个类没介绍过了,剩下的就是UsageTracking,看名称也能知道是干啥的了,后面介绍详细流程的时候会顺带一提,用的不多。

2.3 具体实现

  下面结合具体代码构建一个对象池,来说明该池的是怎么工作的。首先该对象的基本定义如下:需要一个对象类型Student,需要一个连接池管理对象CommonObjectPool,需要一个制造对象的工厂StudentFactory。其大体代码如下:

  

public class Student {

    private String name;
    private int age;

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name=\'" + name + \'\\\'\' +
                ", age=" + age +
                \'}\';
    }
}



public class StudentFactory extends BasePooledObjectFactory<Student> {

    private Random random = new Random();

    public Student create() throws Exception {

        int age = random.nextInt(100);
        Student student = new Student("commons", age);
        System.out.println("创建对象:" + student);
        return student;
    }

    public PooledObject<Student> wrap(Student obj) {
        return new DefaultPooledObject<Student>(obj);
    }

    @Override
    public void destroyObject(PooledObject<Student> p) throws Exception {
        System.out.println("销毁对象:" + p.getObject());
        super.destroyObject(p);
    }

    @Override
    public boolean validateObject(PooledObject<Student> p) {
        System.out.println("校验对象是否可用:" + p.getObject());
        return super.validateObject(p);
    }

    @Override
    public void activateObject(PooledObject<Student> p) throws Exception {
        System.out.println("激活钝化的对象系列操作:" + p.getObject());
        super.activateObject(p);
    }

    @Override
    public void passivateObject(PooledObject<Student> p) throws Exception {
        System.out.println("钝化未使用的对象:" + p.getObject());
        super.passivateObject(p);
    }
}


public class CommonObjectPool extends GenericObjectPool<Student> {

    public CommonObjectPool(PooledObjectFactory<Student> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}

  使用方法如下:

public class Test {

    public static void main(String[] args) {
        StudentFactory studentFactory = new StudentFactory();
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        AbandonedConfig abandonedConfig = new AbandonedConfig();
        CommonObjectPool pool = new CommonObjectPool(studentFactory, config, abandonedConfig);

        Student student = null;
        try {
            student = pool.borrowObject();
            System.out.println(student);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(student != null) pool.returnObject(student);
        }
    }
}

  下面结合源码和例子讲解执行过程,先是通过对象工厂类和配置初始化了一个pool,pool的初始化操作代码如下:

public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;

        idleObjects = new LinkedBlockingDeque<>(config.getFairness());

        setConfig(config);

        startEvictor(getTimeBetweenEvictionRunsMillis());
 }

  主要的工作就是设置工厂类,配置,开启驱逐线程。下面先介绍驱逐线程的工作机制:

final void startEvictor(final long delay) {
        synchronized (evictionLock) {
            if (null != evictor) {
                EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
                evictor = null;
                evictionIterator = null;
            }
            if (delay > 0) {
                evictor = new Evictor();
                EvictionTimer.schedule(evictor, delay, delay);
            }
        }
}

  如果设置过了就会关闭,不然要delay大于0才会开启该线程,该值就是config中的getTimeBetweenEvictionRunsMillis。开启方式就是通过EvictionTimer的周期任务,这实际上就是一个Timer定时器。该定时器做的工作如下:

  

    public void run() {
            final ClassLoader savedClassLoader =
                    Thread.currentThread().getContextClassLoader();
            try {
                if (factoryClassLoader != null) {
                    // Set the class loader for the factory
                    final ClassLoader cl = factoryClassLoader.get();
                    if (cl == null) {
                        // The pool has been dereferenced and the class loader
                        // GC\'d. Cancel this timer so the pool can be GC\'d as
                        // well.
                        cancel();
                        return;
                    }
                    Thread.currentThread().setContextClassLoader(cl);
                }

                // Evict from the pool
                try {
                    evict();
                } catch(final Exception e) {
                    swallowException(e);
                } catch(final OutOfMemoryError oome) {
                    // Log problem but give evictor thread a chance to continue
                    // in case error is recoverable
                    oome.printStackTrace(System.err);
                }
                // Re-create idle instances.
                try {
                    ensureMinIdle();
                } catch (final Exception e) {
                    swallowException(e);
                }
            } finally {
                // Restore the previous CCL
                Thread.currentThread().setContextClassLoader(savedClassLoader);
            }
        }

  可以看出,先进行了驱逐,再判断是否小于minIdle的设置,小于就会再次创建对象。

  

private void ensureIdle(final int idleCount, final boolean always) throws Exception {
        if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
            return;
        }

        while (idleObjects.size() < idleCount) {
            final PooledObject<T> p = create();
            if (p == null) {
                // Can\'t create objects, no reason to think another call to
                // create will work. Give up.
                break;
            }
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        if (isClosed()) {
            // Pool closed while object was being added to idle objects.
            // Make sure the returned object is destroyed rather than left
            // in the idle object pool (which would effectively be a leak)
            clear();
        }
}

  就是调用create方法创建,根据lifo的参数决定是先入先出还是后入先出。evict方法主要做了如下操作:

    public void evict() throws Exception {
        assertOpen();

        if (idleObjects.size() > 0) {

            PooledObject<T> underTest = null;
            final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();

            synchronized (evictionLock) {
                final EvictionConfig evictionConfig = new EvictionConfig(
                        getMinEvictableIdleTimeMillis(),
                        getSoftMinEvictableIdleTimeMillis(),
                        getMinIdle());

                final boolean testWhileIdle = getTestWhileIdle();

                for (int i = 0, m = getNumTests(); i < m; i++) {
                    if (evictionIterator == null || !evictionIterator.hasNext()) {
                        evictionIterator = new EvictionIterator(idleObjects);
                    }
                    if (!evictionIterator.hasNext()) {
                        // Pool exhausted, nothing to do here
                        return;
                    }

                    try {
                        underTest = evictionIterator.next();
                    } catch (final NoSuchElementException nsee) {
                        // Object was borrowed in another thread
                        // Don\'t count this as an eviction test so reduce i;
                        i--;
                        evictionIterator = null;
                        continue;
                    }

                    if (!underTest.startEvictionTest()) {
                        // Object was borrowed in another thread
                        // Don\'t count this as an eviction test so reduce i;
                        i--;
                        continue;
                    }

                    // User provided eviction policy could throw all sorts of
                    // crazy exceptions. Protect against such an exception
                    // killing the eviction thread.
                    boolean evict;
                    try {
                        evict = evictionPolicy.evict(evictionConfig, underTest,
                                idleObjects.size());
                    } catch (final Throwable t) {
                        // Slightly convoluted as SwallowedExceptionListener
                        // uses Exception rather than Throwable
                        PoolUtils.checkRethrow(t);
                        swallowException(new Exception(t));
                        // Don\'t evict on error conditions
                        evict = false;
                    }

                    if (evict) {
                        destroy(underTest);
                        destroyedByEvictorCount.incrementAndGet();
                    } else {
                        if (testWhileIdle) {
                            boolean active = false;
                            try {
                                factory.activateObject(underTest);
                                active = true;
                            } catch (final Exception e) {
                                destroy(underTest);
                                destroyedByEvictorCount.incrementAndGet();
                            }
                            if (active) {
                                if (!factory.validateObject(underTest)) {
                                    destroy(underTest);
                                    destroyedByEvictorCount.incrementAndGet();
                                } else {
                                    try {
                                        factory.passivateObject(underTest);
                                    } catch (final Exception e) {
                                        destroy(underTest);
                                        destroyedByEvictorCount.incrementAndGet();
                                    }
                                }
                            }
                        }
                        if (!underTest.endEvictionTest(idleObjects)) {
                            // TODO - May need to add code here once additional
                            // states are used
                        }
                    }
                }
            }
        }
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
            removeAbandoned(ac);
        }
    }

  先是判断池是开启状态,且空闲对象要大于0,不然不需要驱逐。然后循环了设置的numTests的次数,一次驱逐就检查这么多个对象。后面一段是并发被干扰的一些操作,主要是保证被干扰后仍检查这么些对象。最后就是根据驱逐策略来驱逐对象。上面配置项说过是怎么回事,具体见DefaultEvictionPolicy。如果判断是驱逐,就调用destory方法销毁对象。否则,判断testWhileIdle配置项,决定是否校验对象是否仍可用,先激活对象activateObject,有异常直接销毁。否则开始校验对象的可用性,validateObject。失败销毁,成功就钝化变成原样子。钝化失败也直接销毁。最后是一个遗弃对象的设置,就是说有些对象借出去了由于种种原因,比如写法上的问题,导致对象很久没有还回来,这个设置就是用于清理这类对象的。这类对象不再被池借出,但又暂用了资源。一般而言该配置很少用到,因为写方式通常都将return操作放在finally模块,不会出现此类情况。

  最后我们看下借对象和还对象都做了哪些操作吧。

    public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        assertOpen();

        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            p = idleObjects.pollFirst();
            if (p == null) {
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            if (blockWhenExhausted) {
                if (p == null) {
                    if (borrowMaxWaitMillis < 0) {
                        p = idleObjects.takeFirst();
                    } else {
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            if (!p.allocate()) {
                p = null;
            }

            if (p != null) {
                try {
                    factory.activateObject(p);
                } catch (final Exception e) {
                    try {
                        destroy(p);
                    } catch (final Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    if (!validate) {
                        try {
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }

  借的操作步骤如下:先确定池是否开启,再根据条件决定是否移除遗弃的对象。开始获取对象:1.从idle中获取一个,没获取到就创建一个,创建的逻辑涉及参数maxTotal,超过这个值不会创建对象,返回null,maxTotal为-1意为创建的数量为无限(整数最大)。2.创建失败,线程阻塞,等待时间为-1就一直等待,不为-1等到指定时间还没等到,就抛出异常。3.不等待直接会在没获取对象的时候直接抛出异常。4.对象状态不对,没有锁定,置为null。5.上述都没问题,获取对象后开始激活对象,失败销毁对象。成功后判断是否borrow和create的时候要校验对象可用性,需要进行校验,校验失败销毁。上诉是一个while(p==null)的循环,所以borrow的结果只有2种,1是借不到对象超时,2是借到对象。其他就是等待获取空闲对象。

  还对象的逻辑也不难:

    public void returnObject(final T obj) {
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));

        if (p == null) {
            if (!isAbandonedConfig()) {
                throw new IllegalStateException(
                        "Returned object not currently part of this pool");
            }
            return; // Object was abandoned and removed
        }

        synchronized(p) {
            final PooledObjectState state = p.getState();
            if (state != PooledObjectState.ALLOCATED) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
            p.markReturning(); // Keep from being marked abandoned
        }

        final long activeTime = p.getActiveTimeMillis();

        if (getTestOnReturn()) {
            if (!factory.validateObject(p)) {
                try {
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                updateStatsReturn(activeTime);
                return;
            }
        }

        try {
            factory.passivateObject(p);
        } catch (final Exception e1) {
            swallowException(e1);
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }

  更新对象状态,判断还的时候是否要校验对象可用性,不可用销毁。之后钝化对象,钝化失败销毁,超过maxIdle也直接销毁。最后根据lifo来确定放回方式。因为涉及销毁对象,所以都要进行确定minidle来决定是否补充对象。

3.结束语

  commons-pool2的主要逻辑就是上述内容了,代码例子也给了一个。这里总结一下对象的一个生命周期:create->activate->invalidate->borrow->invalidate->return->destory。其中validate阶段发生在各个环节,主要通过TestOnXXX进行配置决定。

 

以上是关于commons-pool2的主要内容,如果未能解决你的问题,请参考以下文章

redis连接池 jedis-2.9.0.jar+commons-pool2-2.4.2.jar

jar包下载步骤 commons-pool2

commons-pool2中GenericObjectPoolConfig的maxTotalmaxIdleminIdle属性理解

commons-pool2中GenericObjectPoolConfig的maxTotalmaxIdleminIdle属性理解

使用commons-pool2实现FTP连接池

spring整合hibernate时出错(已经导入commons-dbcp2-2.1.1.jar和commons-pool2-2.1.jar)