[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?
Posted 削尖的螺丝刀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?相关的知识,希望对你有一定的参考价值。
我们在之前分析源码的时候 ,说到了连接池初始化的中非常关键的两个方法 ,分别是 createAndStartCreatorThread() 和 createAndStartDestroyThread(),他们分别代表了连接的创建和销毁逻辑,我们上次从整个流程提炼出来,并对销毁逻辑做了拆解,这次我们再补齐对连接的创建逻辑的内容:
创建连接任务的主要流程如下:
[ 创建链接的核心流程 ]
0.入口
protected void createAndStartCreatorThread()
if (createScheduler == null)
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start();
return;
initedLatch.countDown();
1.核心创建流程
public void run()
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
for (;;)
// addLast
try
// 获取锁
lock.lockInterruptibly();
catch (InterruptedException e2)
break;
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try
boolean emptyWait = true;
// 螺丝刀补充: 创建失败,池中数量为0,抛弃状态false -- 取消等待状态
if (createError != null
&& poolingCount == 0
&& !discardChanged)
emptyWait = false;
// 螺丝刀补充: 异步初始化,链接创建数小于初始化大小 -- 取消等待状态
if (emptyWait
&& asyncInit && createCount < initialSize)
emptyWait = false;
// 螺丝刀补充: 等待
if (emptyWait)
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount
// 未开启保活 , 闲置连接数大于最小闲置连接
&& (!(keepAlive && activeCount + poolingCount < minIdle))
// 没有执行失败的链接
&& !isFailContinuous()
)
empty.await();
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive)
empty.await();
continue;
catch (InterruptedException e)
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
if ((!closing) && (!closed))
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
break;
finally
lock.unlock();
PhysicalConnectionInfo connection = null;
try
// 螺丝刀补充:创建物理链接
connection = createPhysicalConnection();
catch (SQLException e)
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
// 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0)
// fail over retry attempts
setFailContinuous(true);
// 快速失败
if (failFast)
lock.lock();
try
// 唤醒所有
notEmpty.signalAll();
finally
lock.unlock();
if (breakAfterAcquireFailure)
break;
try
Thread.sleep(timeBetweenConnectErrorMillis);
catch (InterruptedException interruptEx)
break;
catch (RuntimeException e)
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
catch (Error e)
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
if (connection == null)
continue;
// 螺丝刀补充: 把创建的连接放入池子中
boolean result = put(connection);
if (!result)
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
errorCount = 0; // reset errorCount
if (closing || closed)
break;
2.物理链接创建
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException
// 下面是一系列创建连接的基础属性设置
String url = this.getUrl();
Properties connectProperties = getConnectProperties();
String user;
if (getUserCallback() != null)
user = getUserCallback().getName();
else
user = getUsername();
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null)
if (passwordCallback instanceof DruidPasswordCallback)
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
char[] chars = passwordCallback.getPassword();
if (chars != null)
password = new String(chars);
Properties physicalConnectProperties = new Properties();
if (connectProperties != null)
physicalConnectProperties.putAll(connectProperties);
if (user != null && user.length() != 0)
physicalConnectProperties.put("user", user);
if (password != null && password.length() != 0)
physicalConnectProperties.put("password", password);
Connection conn = null;
long connectStartNanos = System.nanoTime();
long connectedNanos, initedNanos, validatedNanos;
Map<String, Object> variables = initVariants
? new HashMap<String, Object>()
: null;
Map<String, Object> globalVariables = initGlobalVariants
? new HashMap<String, Object>()
: null;
createStartNanosUpdater.set(this, connectStartNanos);
creatingCountUpdater.incrementAndGet(this);
try
// 螺丝刀补充: 通过驱动创建物理连接核心逻辑
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null)
throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
// 螺丝刀补充: 初始化物理连接
initPhysicalConnection(conn, variables, globalVariables);
initedNanos = System.nanoTime();
// 螺丝刀补充: 校验链接
validateConnection(conn);
validatedNanos = System.nanoTime();
setFailContinuous(false);
setCreateError(null);
catch (SQLException ex)
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
catch (RuntimeException ex)
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
catch (Error ex)
createErrorCountUpdater.incrementAndGet(this);
setCreateError(ex);
JdbcUtils.close(conn);
throw ex;
finally
long nano = System.nanoTime() - connectStartNanos;
createTimespan += nano;
creatingCountUpdater.decrementAndGet(this);
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos, variables, globalVariables);
2.1通过JDBC驱动创建连接
public Connection createPhysicalConnection(String url, Properties info) throws SQLException
Connection conn;
// 螺丝刀补充:JDBC驱动创建完链接并且记录( 先判断过滤器是否为空,不为空则先走过滤器 )
if (getProxyFilters().isEmpty())
conn = getDriver().connect(url, info);
else
conn = new FilterChainImpl(this).connection_connect(info);
createCountUpdater.incrementAndGet(this);
return conn;
2.2 创建连接如果失败的话,进行快速重试 以及 空连接判断和重试
try
// 螺丝刀补充:创建物理链接
connection = createPhysicalConnection();
catch (SQLException e)
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
// 螺丝刀补充: 如果失败了 但是 没有超过重试时间,则执行重试
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0)
// fail over retry attempts
setFailContinuous(true);
// 快速失败
if (failFast)
lock.lock();
try
// 唤醒所有
notEmpty.signalAll();
finally
lock.unlock();
if (breakAfterAcquireFailure)
break;
try
Thread.sleep(timeBetweenConnectErrorMillis);
catch (InterruptedException interruptEx)
break;
catch (RuntimeException e)
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
catch (Error e)
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
if (connection == null)
continue;
3.放入连接池(数组)
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo)
DruidConnectionHolder holder = null;
try
// 螺丝刀补充: 创建 holder
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
catch (SQLException ex)
// 螺丝刀补充: 如果创建过程发生了异常则清除创建记录
lock.lock();
try
if (createScheduler != null)
clearCreateTask(physicalConnectionInfo.createTaskId);
finally
lock.unlock();
LOG.error("create connection holder error", ex);
return false;
return put(holder, physicalConnectionInfo.createTaskId, false);
3.1 put() 放入连接池最终的核心逻辑
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists)
lock.lock();
try
// 螺丝刀补充: 下面三个目的都是判断是否要把链接记录清除
if (this.closing || this.closed)
return false;
if (poolingCount >= maxActive)
if (createScheduler != null)
clearCreateTask(createTaskId);
return false;
if (checkExists)
for (int i = 0; i < poolingCount; i++)
if (connections[i] == holder)
return false;
// 螺丝刀补充: 关键点,holder放入池子中,也就是所谓的数组,并且累加记录
connections[poolingCount] = holder;
incrementPoolingCount();
if (poolingCount > poolingPeak)
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
// 唤醒notEmpty锁
notEmpty.signal();
notEmptySignalCount++;
// 螺丝刀补充: 这里判断,如果创建链接的任务不为空,则清除当前任务(因为已经完成了创建)
if (createScheduler != null)
clearCreateTask(createTaskId);
// 螺丝刀补充: 这里又是一堆判断, 相当于doubleCheck,然后唤醒empty锁,创建链接
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive)
emptySignal();
finally
lock.unlock();
return true;
以上是关于[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?的主要内容,如果未能解决你的问题,请参考以下文章
[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览
[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览
[ Druid ] 源码拆解 —— 3. 连接池到底是如何做到 收缩的 ?
Druid连接池源码解析(2)DruidDataSource-2