[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?

Posted 削尖的螺丝刀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?相关的知识,希望对你有一定的参考价值。

          

                 

                我们在之前分析源码的时候 ,说到了连接池初始化的中非常关键的两个方法 ,分别是  createAndStartCreatorThread()  和 createAndStartDestroyThread(),他们分别代表了连接的创建和销毁逻辑,我们上次从整个流程提炼出来,并对销毁逻辑做了拆解,这次我们再补齐对连接的创建逻辑的内容:


创建连接任务的主要流程如下:

[ 创建链接的核心流程 ]


0.入口



protected void createAndStartCreatorThread() 
    if (createScheduler == null) 
        String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
        createConnectionThread = new CreateConnectionThread(threadName);
        createConnectionThread.start();
        return;
    
    
    initedLatch.countDown();

1.核心创建流程




public void run() 
    initedLatch.countDown();
    
    long lastDiscardCount = 0;
    int errorCount = 0;
    
    
    for (;;) 
        // addLast
        try 
            // 获取锁
            lock.lockInterruptibly();
         catch (InterruptedException e2) 
            break;
        
        
        long discardCount = DruidDataSource.this.discardCount;
        boolean discardChanged = discardCount - lastDiscardCount > 0;
        lastDiscardCount = discardCount;
        
        try 
            boolean emptyWait = true;
            
            // 螺丝刀补充: 创建失败,池中数量为0,抛弃状态false  -- 取消等待状态
            if (createError != null
                && poolingCount == 0
                && !discardChanged) 
                emptyWait = false;
            
            
            // 螺丝刀补充: 异步初始化,链接创建数小于初始化大小  -- 取消等待状态
            if (emptyWait
                && asyncInit && createCount < initialSize) 
                emptyWait = false;
            
            
            // 螺丝刀补充: 等待
            if (emptyWait) 
                // 必须存在线程等待,才创建连接
                if (poolingCount >= notEmptyWaitThreadCount
                    // 未开启保活 , 闲置连接数大于最小闲置连接
                    && (!(keepAlive && activeCount + poolingCount < minIdle))
                    // 没有执行失败的链接
                    && !isFailContinuous()
                   ) 
                    empty.await();
                
                
                // 防止创建超过maxActive数量的连接
                if (activeCount + poolingCount >= maxActive) 
                    empty.await();
                    continue;
                
            
            
         catch (InterruptedException e) 
            lastCreateError = e;
            lastErrorTimeMillis = System.currentTimeMillis();
            
            if ((!closing) && (!closed)) 
                LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
            
            break;
         finally 
            lock.unlock();
        
        
        PhysicalConnectionInfo connection = null;
        
        try 
            // 螺丝刀补充:创建物理链接
            connection = createPhysicalConnection();
         catch (SQLException e) 
            LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                      + ", state " + e.getSQLState(), e);
            
            errorCount++;
            // 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
            if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) 
                // fail over retry attempts
                setFailContinuous(true);
                // 快速失败
                if (failFast) 
                    lock.lock();
                    try 
                        // 唤醒所有
                        notEmpty.signalAll();
                     finally 
                        lock.unlock();
                    
                
                
                if (breakAfterAcquireFailure) 
                    break;
                
                
                try 
                    Thread.sleep(timeBetweenConnectErrorMillis);
                 catch (InterruptedException interruptEx) 
                    break;
                
            
         catch (RuntimeException e) 
            LOG.error("create connection RuntimeException", e);
            setFailContinuous(true);
            continue;
         catch (Error e) 
            LOG.error("create connection Error", e);
            setFailContinuous(true);
            break;
        
        
        if (connection == null) 
            continue;
        
        
        // 螺丝刀补充: 把创建的连接放入池子中
        boolean result = put(connection);
        if (!result) 
            JdbcUtils.close(connection.getPhysicalConnection());
            LOG.info("put physical connection to pool failed.");
        
        
        errorCount = 0; // reset errorCount
        
        if (closing || closed) 
            break;
        
    

    


2.物理链接创建




public PhysicalConnectionInfo createPhysicalConnection() throws SQLException 
    
        // 下面是一系列创建连接的基础属性设置
        String url = this.getUrl();
        Properties connectProperties = getConnectProperties();

        String user;
        if (getUserCallback() != null) 
            user = getUserCallback().getName();
         else 
            user = getUsername();
        

        String password = getPassword();
        PasswordCallback passwordCallback = getPasswordCallback();

        if (passwordCallback != null) 
            if (passwordCallback instanceof DruidPasswordCallback) 
                DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;

                druidPasswordCallback.setUrl(url);
                druidPasswordCallback.setProperties(connectProperties);
            

            char[] chars = passwordCallback.getPassword();
            if (chars != null) 
                password = new String(chars);
            
        

        Properties physicalConnectProperties = new Properties();
        if (connectProperties != null) 
            physicalConnectProperties.putAll(connectProperties);
        

        if (user != null && user.length() != 0) 
            physicalConnectProperties.put("user", user);
        

        if (password != null && password.length() != 0) 
            physicalConnectProperties.put("password", password);
        

        Connection conn = null;

        long connectStartNanos = System.nanoTime();
        long connectedNanos, initedNanos, validatedNanos;

        Map<String, Object> variables = initVariants
                ? new HashMap<String, Object>()
                : null;
        Map<String, Object> globalVariables = initGlobalVariants
                ? new HashMap<String, Object>()
                : null;

        createStartNanosUpdater.set(this, connectStartNanos);
        creatingCountUpdater.incrementAndGet(this);
        try 
            
            // 螺丝刀补充: 通过驱动创建物理连接核心逻辑
            conn = createPhysicalConnection(url, physicalConnectProperties);
            connectedNanos = System.nanoTime();

            if (conn == null) 
                throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
            

            // 螺丝刀补充: 初始化物理连接
            initPhysicalConnection(conn, variables, globalVariables);
            initedNanos = System.nanoTime();

            // 螺丝刀补充: 校验链接
            validateConnection(conn);
            validatedNanos = System.nanoTime();

            setFailContinuous(false);
            setCreateError(null);
         catch (SQLException ex) 
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
         catch (RuntimeException ex) 
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
         catch (Error ex) 
            createErrorCountUpdater.incrementAndGet(this);
            setCreateError(ex);
            JdbcUtils.close(conn);
            throw ex;
         finally 
            long nano = System.nanoTime() - connectStartNanos;
            createTimespan += nano;
            creatingCountUpdater.decrementAndGet(this);
        

        return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
    


2.1通过JDBC驱动创建连接




public Connection createPhysicalConnection(String url, Properties info) throws SQLException 
        Connection conn;

        // 螺丝刀补充:JDBC驱动创建完链接并且记录( 先判断过滤器是否为空,不为空则先走过滤器 )
        if (getProxyFilters().isEmpty()) 
            conn = getDriver().connect(url, info);
         else 
            conn = new FilterChainImpl(this).connection_connect(info);
        

        createCountUpdater.incrementAndGet(this);

        return conn;
    


2.2 创建连接如果失败的话,进行快速重试 以及 空连接判断和重试




        try 

                    // 螺丝刀补充:创建物理链接
                    connection = createPhysicalConnection();
                 catch (SQLException e) 
                    LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                              + ", state " + e.getSQLState(), e);

                    errorCount++;
                    // 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
                    if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) 
                        // fail over retry attempts
                        setFailContinuous(true);
                        // 快速失败
                        if (failFast) 
                            lock.lock();
                            try 
                                // 唤醒所有
                                notEmpty.signalAll();
                             finally 
                                lock.unlock();
                            
                        

                        if (breakAfterAcquireFailure) 
                            break;
                        

                        try 
                            Thread.sleep(timeBetweenConnectErrorMillis);
                         catch (InterruptedException interruptEx) 
                            break;
                        
                    
                 catch (RuntimeException e) 
                    LOG.error("create connection RuntimeException", e);
                    setFailContinuous(true);
                    continue;
                 catch (Error e) 
                    LOG.error("create connection Error", e);
                    setFailContinuous(true);
                    break;
                

                if (connection == null) 
                    continue;
                


3.放入连接池(数组)




protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) 
        DruidConnectionHolder holder = null;
        try 
            // 螺丝刀补充: 创建 holder
            holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
         catch (SQLException ex) 
            // 螺丝刀补充: 如果创建过程发生了异常则清除创建记录
            lock.lock();
            try 
                if (createScheduler != null) 
                    clearCreateTask(physicalConnectionInfo.createTaskId);
                
             finally 
                lock.unlock();
            
            LOG.error("create connection holder error", ex);
            return false;
        

        return put(holder, physicalConnectionInfo.createTaskId, false);
    



3.1 put() 放入连接池最终的核心逻辑




private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) 
        lock.lock();
        try 

            // 螺丝刀补充: 下面三个目的都是判断是否要把链接记录清除
            if (this.closing || this.closed) 
                return false;
            

            if (poolingCount >= maxActive) 
                if (createScheduler != null) 
                    clearCreateTask(createTaskId);
                
                return false;
            

            if (checkExists) 
                for (int i = 0; i < poolingCount; i++) 
                    if (connections[i] == holder) 
                        return false;
                    
                
            

            // 螺丝刀补充: 关键点,holder放入池子中,也就是所谓的数组,并且累加记录
            connections[poolingCount] = holder;
            incrementPoolingCount();

            if (poolingCount > poolingPeak) 
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            

            // 唤醒notEmpty锁
            notEmpty.signal();
            notEmptySignalCount++;

            // 螺丝刀补充: 这里判断,如果创建链接的任务不为空,则清除当前任务(因为已经完成了创建)
            if (createScheduler != null) 
                clearCreateTask(createTaskId);

                // 螺丝刀补充: 这里又是一堆判断, 相当于doubleCheck,然后唤醒empty锁,创建链接
                if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                    && activeCount + poolingCount + createTaskCount < maxActive) 
                    emptySignal();
                
            
         finally 
            lock.unlock();
        
        return true;
    


以上是关于[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?的主要内容,如果未能解决你的问题,请参考以下文章

[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览

[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览

[ Druid ] 源码拆解 —— 3. 连接池到底是如何做到 收缩的 ?

Druid连接池源码解析(2)DruidDataSource-2

真难!通过源码告诉你阿里的数据库连接池Druid为啥如此牛逼!

Druid数据库连接池源码分析