Druid源码分析(四) 从连接池获取数据库链接getConnectionDirect()

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Druid源码分析(四) 从连接池获取数据库链接getConnectionDirect()相关的知识,希望对你有一定的参考价值。

参考技术A public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException
// 超时重试次数,达到这个值就报错
int notFullTimeoutRetryCnt = 0;
for (; ; )
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try
// 核心逻辑
poolableConnection = getConnectionInternal(maxWaitMillis);
catch (GetConnectionTimeoutException ex)
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull())
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled())
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);

continue;

throw ex;

[ Druid ] 源码拆解 —— 3. 连接池到底是如何做到 收缩的 ?

        开头的文章我们从 Druid 的入口了解了它从初始化到创建获取链接,然后到最终销毁的过程,但还有一块没有细说,那就是 Shrink 它的英文本意有一层缩水的意思,没错就像你新买的牛仔裤洗完就缩水了。但是这里的缩水明显更加智能,它是池化的一项必备技能,你可以在各种池化工具中看到它的身影 

同时文中还提到各种参数的初始化,我们这里再从 Druid 官方文档 来看看,主要的配置参数都有哪些:

配置

缺省值

说明

name

配置这个属性的意义在于,如果存在多个数据源,监控的时候可以通过名字来区分开来。如果没有配置,将会生成一个名字,格式是:"DataSource-" + System.identityHashCode(this). 另外配置此属性至少在1.0.5版本中是不起作用的,强行设置name会出错。详情-点此处

url

连接数据库的url,不同数据库不一样。例如:
mysql : jdbc:mysql://10.20.153.104:3306/druid2
oracle : jdbc:oracle:thin:@10.20.149.85:1521:ocnauto

username

连接数据库的用户名

password

连接数据库的密码。如果你不希望密码直接写在配置文件中,可以使用ConfigFilter。详细看这里

driverClassName

根据url自动识别

这一项可配可不配,如果不配置druid会根据url自动识别dbType,然后选择相应的driverClassName

initialSize

0

初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时

maxActive

8

最大连接池数量

maxIdle

8

已经不再使用,配置了也没效果

minIdle

最小连接池数量

maxWait

获取连接时最大等待时间,单位毫秒。配置了maxWait之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置useUnfairLock属性为true使用非公平锁。

poolPreparedStatements

false

是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql下建议关闭。

maxPoolPreparedStatementPerConnectionSize

-1

要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100

validationQuery

用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。

validationQueryTimeout

单位:秒,检测连接是否有效的超时时间。底层调用jdbc Statement对象的void setQueryTimeout(int seconds)方法

testOnBorrow

true

申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。

testOnReturn

false

归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。

testWhileIdle

false

建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。

keepAlive

false
(1.0.28)

连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。

timeBetweenEvictionRunsMillis

1分钟(1.0.14)

有两个含义:
1) Destroy线程会检测连接的间隔时间,如果连接空闲时间大于等于minEvictableIdleTimeMillis则关闭物理连接。
2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明

numTestsPerEvictionRun

30分钟(1.0.14)

不再使用,一个DruidDataSource只支持一个EvictionRun

minEvictableIdleTimeMillis

连接保持空闲而不被驱逐的最小时间

connectionInitSqls

物理连接初始化的时候执行的sql

exceptionSorter

根据dbType自动识别

当数据库抛出一些不可恢复的异常时,抛弃连接

filters

属性类型是字符串,通过别名的方式配置扩展插件,常用的插件有:
监控统计用的filter:stat
日志用的filter:log4j
防御sql注入的filter:wall

proxyFilters

类型是List<com.alibaba.druid.filter.Filter>,如果同时配置了filters和proxyFilters,是组合关系,并非替换关系


        在上篇文章中提到过,初始化时有一个  CountdownLatch  等待两个守护线程的逻辑, 而其中一个关键守护线程就和本次要讲的内容有关 —— <createAndStartDestroyThread>。我们再来看看上面的参数列表:

  • minIdle
  • keepAlive
  • timeBetweenEvictionRunsMillis
  • minEvictableIdleTimeMillis
  • validationQuery

        等等,这些关键参数都参与其中,组成了该方法中对闲置连接的剔除,达到收缩的目的。同时还涵盖了对保活连接的校验,如果失效,同样会剔除,下面我们就来看看该守护线程要完成的任务细节把 ~~


0.入口

 protected void createAndStartDestroyThread() 
        destroyTask = new DestroyTask();

        if (destroyScheduler != null) 

            long period = timeBetweenEvictionRunsMillis;
            if (period <= 0) 
                period = 1000;
            
            destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                          TimeUnit.MILLISECONDS);
            initedLatch.countDown();
            return;
        

        String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
        destroyConnectionThread = new DestroyConnectionThread(threadName);
        destroyConnectionThread.start();
    

1.心跳时长开启校验

 public class DestroyConnectionThread extends Thread 

        public DestroyConnectionThread(String name)
            super(name);
            this.setDaemon(true);
        

        public void run() 
            initedLatch.countDown();

            for (;;) 
                // 从前面开始删除
                try 
                    if (closed || closing) 
                        break;
                    
                    //timeBetweenEvictionRunsMillis是触发心跳间隔时间 ,如果有设置则休眠指定时间段后开始心跳检查 (默认1分钟)

                    if (timeBetweenEvictionRunsMillis > 0) 
                        Thread.sleep(timeBetweenEvictionRunsMillis);
                     else 
                        Thread.sleep(1000); //
                    

                    if (Thread.interrupted()) 
                        break;
                    

                    destroyTask.run();
                 catch (InterruptedException e) 
                    break;
                
            
        

    

2.动态收缩核心逻辑

public void shrink(boolean checkTime, boolean keepAlive) 
        try 
            // 螺丝刀补充: 获取重入锁
            lock.lockInterruptibly();
         catch (InterruptedException e) 
            return;
        

        boolean needFill = false;
        // 待剔除连接数
        int evictCount = 0;
        // 保活连接数
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;

        try 
            if (!inited) 
                return;
            

            // 螺丝刀补充:通过池中连接数 减去 最小连接池数量, 获取需要检测连接的数量
            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i < poolingCount; ++i) 
                DruidConnectionHolder connection = connections[i];

                // 螺丝刀补充: 如果连接发生了致命性异常,则会加入保活连接数组,接下来校验有效性
                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))  
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                

                if (checkTime) 
                    // 螺丝刀补充: 是否设置了物理连接的超时时间phyTimoutMills
                    if (phyTimeoutMillis > 0) 
                       // 当前时间 减去 连接时长  —— 根据判断连接时间存活时间是否已经超过phyTimeoutMills,是则把该连接放入准备剔除的数组中。
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) 
                            evictConnections[evictCount++] = connection;
                            continue;
                        
                    

                    // 当前时间 减去 最近一次活跃时间 算出闲置时长
                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

                    if (idleMillis < minEvictableIdleTimeMillis
                            && idleMillis < keepAliveBetweenTimeMillis
                    ) 
                        break;
                    

                    // 闲置时间大于minEvictableIdleTimeMillis
                    if (idleMillis >= minEvictableIdleTimeMillis) 

                        // 并且 索引(在连接池中的index)小于checkCount的连接 , 放入准备剔除的数组中
                        if (checkTime && i < checkCount) 
                            evictConnections[evictCount++] = connection;
                            continue;

                            // 闲置时长 大于 maxEvictableIdleTimeMillis , 放入准备剔除的数组中
                         else if (idleMillis > maxEvictableIdleTimeMillis) 
                            evictConnections[evictCount++] = connection;
                            continue;
                        
                    

                    // 如果开启保活机制并且空闲时间大于等于保活间隔时间,则加入保活连接数组
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) 
                        keepAliveConnections[keepAliveCount++] = connection;
                    
                 else 
                    if (i < checkCount) 
                        evictConnections[evictCount++] = connection;
                     else 
                        break;
                    
                
            

            int removeCount = evictCount + keepAliveCount;
            if (removeCount > 0) 
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            
            keepAliveCheckCount += keepAliveCount;

            // 螺丝刀补充: 这里判断连接数是否小于最小连接数,如果是的话后面会重新提交新的链接
            if (keepAlive && poolingCount + activeCount < minIdle) 
                needFill = true;
            
         finally 
            lock.unlock();
        

        // 螺丝刀补充: 待剔除连接数大于0 ,遍历准备剔除的连接,逐个关闭, 并记录数量
        if (evictCount > 0) 
            for (int i = 0; i < evictCount; ++i) 
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            
            Arrays.fill(evictConnections, null);
        

        // 螺丝刀补充:保活连接数大于0 , 倒序校验连接的有效性,如果有效则重新加入队列,反之则会关闭连接
        if (keepAliveCount > 0) 
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) 
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try 

                    // 螺丝刀补充: 保活关键校验位置,如果通过了,则证明有效,反之内部会直接抛出异常,在这里被捕获
                    this.validateConnection(connection);
                    validate = true;
                 catch (Throwable error) 
                    if (LOG.isDebugEnabled()) 
                        LOG.debug("keepAliveErr", error);
                    
                    // skip
                

                // 螺丝刀补充:如果上面的校验抛出了异常,这里的discard(抛弃)的则会为true
                boolean discard = !validate;
                // 螺丝刀补充: 有效则重新放回池子中
                if (validate) 
                    
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    // 放入细节
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) 
                        discard = true;
                    
                    
                

                // 螺丝刀补充: 校验过为失效连接,直接关闭,并做记录
                if (discard) 
                    try 
                        connection.close();
                     catch (Exception e) 
                        // skip
                    

                    lock.lock();
                    try 
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) 
                            emptySignal();
                        
                     finally 
                        lock.unlock();
                    
                
            
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        

        // 螺丝刀补充: 这里就是上面走闲置连接校验时判断的是否需要补充线程逻辑
        if (needFill) 
            lock.lock();
            try 
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) 
                    emptySignal();
                
             finally 
                lock.unlock();
            
         else if (onFatalError || fatalErrorIncrement > 0) 
            lock.lock();
            try 
                emptySignal();
             finally 
                lock.unlock();
            
        
    

2.1 收缩细节

        上方代码中说明了: 筛选出的闲置连接会放入待剔除的数组中,而最终的连接剔除细节如下:

   // 螺丝刀补充: 待剔除连接数大于0 ,遍历准备剔除的链接,逐个关闭, 并记录数量
        if (evictCount > 0) 
            for (int i = 0; i < evictCount; ++i) 
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            
            Arrays.fill(evictConnections, null);
        

2.2 保活校验

 // 螺丝刀补充:保活连接数大于0 , 倒序校验连接的有效性,如果有效则重新加入队列,反之则会关闭连接
        if (keepAliveCount > 0) 
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) 
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try 

                    // 螺丝刀补充: 保活关键校验位置,如果通过了,则证明有效,反之内部会直接抛出异常,在这里被捕获
                    this.validateConnection(connection);
                    validate = true;
                 catch (Throwable error) 
                    if (LOG.isDebugEnabled()) 
                        LOG.debug("keepAliveErr", error);
                    
                    // skip
                

                // 螺丝刀补充:如果上面的校验抛出了异常,这里的discard(抛弃)的则会为true
                boolean discard = !validate;
                // 螺丝刀补充: 有效则重新放回池子中
                if (validate) 
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) 
                        discard = true;
                        
                    
                
                
            

2.2.1 保活校验细节

public void validateConnection(Connection conn) throws SQLException 
    
        // 螺丝刀补充, 获取我们设置的保活校验sql
        String query = getValidationQuery();
        if (conn.isClosed()) 
            throw new SQLException("validateConnection: connection closed");
        

        if (validConnectionChecker != null) 
            boolean result;
            Exception error = null;
            try 
                result = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);

                if (result && onFatalError) 
                    lock.lock();
                    try 
                        if (onFatalError) 
                            onFatalError = false;
                        
                     finally 
                        lock.unlock();
                    
                
             catch (SQLException ex) 
                throw ex;
             catch (Exception ex) 
                result = false;
                error = ex;
            

            // 有效连接校验失效则抛出异常
            if (!result) 
                SQLException sqlError = error != null ? //
                    new SQLException("validateConnection false", error) //
                    : new SQLException("validateConnection false");
                throw sqlError;
            
            return;
        

        // 存在校验SQL,则执行校验
        if (null != query) 
            Statement stmt = null;
            ResultSet rs = null;
            try 
                stmt = conn.createStatement();
                if (getValidationQueryTimeout() > 0) 
                    stmt.setQueryTimeout(getValidationQueryTimeout());
                
                rs = stmt.executeQuery(query);

                // 是否得到结果,无则抛出
                if (!rs.next()) 
                    throw new SQLException("validationQuery didn't return a row");
                

                if (onFatalError) 
                    lock.lock();
                    try 
                        if (onFatalError) 
                            onFatalError = false;
                        
                    
                    finally 
                        lock.unlock();
                    
                
             finally 
                JdbcUtils.close(rs);
                JdbcUtils.close(stmt);
            
        
    

2.3 连接补充细节

// 螺丝刀补充:如果上面的校验抛出了异常,这里的discard(抛弃)的则会为true
                boolean discard = !validate;
                // 螺丝刀补充: 有效则重新放回池子中
                if (validate) 
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) 
                        discard = true;
                    
                

                // 螺丝刀补充: 校验过为失效连接,直接关闭,并做记录
                if (discard) 
                    try 
                        connection.close();
                     catch (Exception e) 
                        // skip
                    

                    lock.lock();
                    try 
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) 
                            emptySignal();
                        
                     finally 
                        lock.unlock();
                    
                
            
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);


                以上就是连接动态收缩 涵 保活校验 的核心逻辑,因为他是守护线程,所以该逻辑会不断周期性的循环,伴随程序走完一生,直至程序被关闭为止。而其核心目的也只不过是围绕着两个关键数组做文章 , 分别是 一个待剔除的连接数组 和 一个保活连接的校验数组,概括总结如下:

  • 进入保活连接数组的条件:
    • 连接发生了致命性异常
    • 开启保活机制并且空闲时间大于等于保活间隔时间

  • 进入待剔除连接数组的条件:
    • 连接的空闲时间大于设置的物理连接超时时间
    • 连接的空闲时间大于最小驱逐空闲时间,并且轮询索引小于合并计数器
    • 空闲时间大于最大驱逐空闲时间

—— 当然过程中还涵盖一个补充连接的校验,这也离不开小连接数minIdle参数的控制

以上是关于Druid源码分析(四) 从连接池获取数据库链接getConnectionDirect()的主要内容,如果未能解决你的问题,请参考以下文章

聊聊Druid?从底层的分析Druid是如何管理数据库连接?

Druid数据库连接池源码分析

[ Druid ] 源码拆解 —— 3. 连接池到底是如何做到 收缩的 ?

Druid连接池源码解析(1)DruidDataSource

真难!通过源码告诉你阿里的数据库连接池Druid为啥如此牛逼!

Druid源码分析(八) DestroyThread扫描回收连接