[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览
Posted 削尖的螺丝刀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览相关的知识,希望对你有一定的参考价值。
说到数据库连接池,一定绕不过的坎就是 JDBC, 本文对源码的拆解阅读,默认读者已经了解JDBC,此处只做简单概括,略微回忆,如果已经掌握请直接跳至 Druid 源码(基于1.2.9) 的拆解部分:
JDBC (全称: Java Database Connectivity)
—— 它代表了 Java数据库连接,是你代码逻辑到一切数据库的直接桥梁,我们可以简单鸟瞰一下JDBC的架构和对应的API核心组件
JDBC API提供以下接口和类 :
- DriverManager:此类管理数据库驱动程序列表。 使用通信子协议将来自java应用程序的连接请求与适当的数据库驱动程序进行匹配。在JDBC下识别某个子协议的第一个驱动程序将用于建立数据库连接。
- Driver:此接口处理与数据库服务器的通信。我们很少会直接与Driver对象进行交互。 但会使用DriverManager对象来管理这种类型的对象。 它还提取与使用Driver对象相关的信息。
- Connection:此接口具有用于联系数据库的所有方法。 连接(Connection)对象表示通信上下文,即,与数据库的所有通信仅通过连接对象。
- Statement:使用从此接口创建的对象将
- SQL语句提交到数据库。 除了执行存储过程之外,一些派生接口还接受参数。
- ResultSet:在使用Statement对象执行SQL查询后,这些对象保存从数据库检索的数据。 它作为一个迭代器并可移动ResultSet对象查询的数据。
- SQLException:此类处理数据库应用程序中发生的任何错误。
我们先简单看一个通过JDBC 连接 mysql 的 demo,了解一下在没有任何框架的加持下,大致的原生逻辑:
拆解 - Druid
通过对 JDBC 的了解,我们可以看出,它的优点和不足(或者说待改善之处),不足之处就是,通过直接使用JDBC来链接数据库,比如难处就有:
- 代码繁琐,编写SQL困难,重复的事情上要花大量的时间
- 要手动维护链接,用完还得关闭
- 链接的频繁开关是个无法忽视的消耗
- 不同DML语句方法之间没有足够高的通用性,且取值方法相对复杂。
- 事物的复杂维护 以及 异常的处理等
而最大的好处则是在将JDBC作为地基的角度上,提供了极高的改造空间,是很多主流框架的底层基石
- 分布式事务
- 数据源对象
- 连接池技术
而Druid 也不例外, 也是基于这一基础上开发而来的。如果说要参观一座花园,少不了的一定是寻找它的大门口,而在Druid这座花园中的大门,毫无意外就是,DruidDataSource。通过下面这张图我们可以看出他的“花园脉络”
[ Druid 关系图 ]
从上图我们可以看到两个关键点, 一个是蓝色实现直接继承的抽象类 DruidAbstractDataSource 和 以虚线 CommonDataSource 作为顶级接口一路继承实现下来的过程。
//STEP 1. Import required packages
import java.sql.*;
public class FirstExample
// JDBC driver name and database URL
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://localhost/emp";
// Database credentials
static final String USER = "root";
static final String PASS = "123456";
public static void main(String[] args)
Connection conn = null;
Statement stmt = null;
try
//STEP 2: Register JDBC driver
Class.forName("com.mysql.jdbc.Driver");
//STEP 3: Open a connection
System.out.println("Connecting to database...");
conn = DriverManager.getConnection(DB_URL,USER,PASS);
//STEP 4: Execute a query
System.out.println("Creating statement...");
stmt = conn.createStatement();
String sql;
sql = "SELECT id, first, last, age FROM Employees";
ResultSet rs = stmt.executeQuery(sql);
//STEP 5: Extract data from result set
while(rs.next())
//Retrieve by column name
int id = rs.getInt("id");
int age = rs.getInt("age");
String first = rs.getString("first");
String last = rs.getString("last");
//Display values
System.out.print("ID: " + id);
System.out.print(", Age: " + age);
System.out.print(", First: " + first);
System.out.println(", Last: " + last);
//STEP 6: Clean-up environment
rs.close();
stmt.close();
conn.close();
catch(SQLException se)
//Handle errors for JDBC
se.printStackTrace();
catch(Exception e)
//Handle errors for Class.forName
e.printStackTrace();
finally
//finally block used to close resources
try
if(stmt!=null)
stmt.close();
catch(SQLException se2)
// nothing we can do
try
if(conn!=null)
conn.close();
catch(SQLException se)
se.printStackTrace();
//end finally try
//end try
System.out.println("There are so thing wrong!");
//end main
//end
我们先从 【DruidDataSource 】获取连接的核心逻辑开始,作为这次拆解 Druid 的学习入口
1. 关键方法getConnection ,从创建到获取链接的最外层逻辑
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException
// 螺丝刀补充:内部完成了校验并初始化一堆配置,同时做信息记录
init();
// 螺丝刀补充: 这下面的判断逻辑核心目的都是为了获取链接,前者是看是否有过滤链要继续执行,后者在最大等待时间内则直接获取
if (filters.size() > 0)
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
else
return getConnectionDirect(maxWaitMillis);
2.初始化细节描述
(关键部分已添加注释,记住核心要点,后期会细化讲解,未标注的其余地方概览即可)
public void init() throws SQLException
if (inited)
return;
// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();
// 螺丝刀补充: 这里创建一把可重入锁
final ReentrantLock lock = this.lock;
try
// 螺丝刀补充: 尝试获取锁,如果未获取到直接返回
lock.lockInterruptibly();
catch (InterruptedException e)
throw new SQLException("interrupt", e);
boolean init = false;
try
// 螺丝刀补充:小细节 double check 一下是否完成初始化步骤
if (inited)
return;
// 螺丝刀补充: 下面都是 【一堆参数的校验、记录 以及 设置】
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = DruidDriver.createDataSourceId();
if (this.id > 1)
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
// 螺丝刀补充:这里对配置好的链接地址做空白去除后,内部对这个地址进行一系列的case判断和初始化
if (this.jdbcUrl != null)
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
for (Filter filter : filters)
filter.init(this);
if (this.dbTypeName == null || this.dbTypeName.length() == 0)
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
DbType dbType = DbType.of(this.dbTypeName);
if (JdbcUtils.isMysqlDbType(dbType))
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration"))
cacheServerConfigurationSet = true;
else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1)
cacheServerConfigurationSet = true;
if (cacheServerConfigurationSet)
this.connectProperties.put("cacheServerConfiguration", "true");
if (maxActive <= 0)
throw new IllegalArgumentException("illegal maxActive " + maxActive);
if (maxActive < minIdle)
throw new IllegalArgumentException("illegal maxActive " + maxActive);
if (getInitialSize() > maxActive)
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat)
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis)
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis)
throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
if (this.driverClass != null)
this.driverClass = driverClass.trim();
// 螺丝刀补充:尝试通过SPI机制加载
initFromSPIServiceLoader();
// 螺丝刀补充:解析驱动并赋值
resolveDriver();
initCheck();
initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();
if (isUseGlobalDataSourceStat())
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null)
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
if (dataSourceStat.getDbType() == null)
dataSourceStat.setDbType(this.dbTypeName);
else
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
dataSourceStat.setResetStatEnable(this.resetStatEnable);
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
if (createScheduler != null && asyncInit)
for (int i = 0; i < initialSize; ++i)
submitCreateTask(true);
else if (!asyncInit)
// init connections
while (poolingCount < initialSize)
try
// 螺丝刀补充:创建物理链接
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
// 螺丝刀补充:链接存入数组
connections[poolingCount++] = holder;
catch (SQLException ex)
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow)
connectError = ex;
break;
else
Thread.sleep(3000);
if (poolingCount > 0)
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
createAndLogThread();
// 螺丝刀补充: 可以看到全局变量有一个值为2的初始化Latch,其目的就是为了等待这下面两个方法中(分别是初始化创建者线程和销毁者线程)守护线程的创建,
createAndStartCreatorThread();
createAndStartDestroyThread();
initedLatch.await();
init = true;
initedTime = new Date();
registerMbean();
if (connectError != null && poolingCount == 0)
throw connectError;
// 螺丝刀补充:是否建立心跳任务
if (keepAlive)
// async fill to minIdle
if (createScheduler != null)
for (int i = 0; i < minIdle; ++i)
submitCreateTask(true);
else
this.emptySignal();
catch (SQLException e)
LOG.error("dataSource-" + this.getID() + " init error", e);
throw e;
catch (InterruptedException e)
throw new SQLException(e.getMessage(), e);
catch (RuntimeException e)
LOG.error("dataSource-" + this.getID() + " init error", e);
throw e;
catch (Error e)
LOG.error("dataSource-" + this.getID() + " init error", e);
throw e;
finally
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled())
String msg = "dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty())
msg += ",";
msg += this.name;
msg += " inited";
LOG.info(msg);
3.获取链接过程
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException
int notFullTimeoutRetryCnt = 0;
// 螺丝刀补充: 上自旋锁
for (;;)
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try
// 螺丝刀补充: 获取链接的关键方法
poolableConnection = getConnectionInternal(maxWaitMillis);
catch (GetConnectionTimeoutException ex)
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull())
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled())
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
continue;
throw ex;
if (testOnBorrow)
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate)
if (LOG.isDebugEnabled())
LOG.debug("skip not validate connection.");
discardConnection(poolableConnection.holder);
continue;
else
if (poolableConnection.conn.isClosed())
discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
continue;
if (testWhileIdle)
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis)
lastActiveTimeMillis = lastExecTimeMillis;
if (lastKeepTimeMillis > lastActiveTimeMillis)
lastActiveTimeMillis = lastKeepTimeMillis;
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0)
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
)
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate)
if (LOG.isDebugEnabled())
LOG.debug("skip not validate connection.");
discardConnection(poolableConnection.holder);
continue;
if (removeAbandoned)
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try
activeConnections.put(poolableConnection, PRESENT);
finally
activeConnectionLock.unlock();
if (!this.defaultAutoCommit)
poolableConnection.setAutoCommit(false);
return poolableConnection;
4.回收链接过程
/**
* close datasource
* 螺丝刀补充: 回收线程
*/
public void close()
if (LOG.isInfoEnabled())
LOG.info("dataSource-" + this.getID() + " closing ...");
// 螺丝刀补充: 上可重入锁
lock.lock();
try
if (this.closed)
return;
if (!this.inited)
return;
this.closing = true;
if (logStatsThread != null)
logStatsThread.interrupt();
if (createConnectionThread != null)
createConnectionThread.interrupt();
if (destroyConnectionThread != null)
destroyConnectionThread.interrupt();
if (createSchedulerFuture != null)
createSchedulerFuture.cancel(true);
if (destroySchedulerFuture != null)
destroySchedulerFuture.cancel(true);
// 螺丝刀补充:遍历池子,关闭对应链接
for (int i = 0; i < poolingCount; ++i)
DruidConnectionHolder connHolder = connections[i];
for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values())
connHolder.getStatementPool().closeRemovedStatement(stmtHolder);
connHolder.getStatementPool().getMap().clear();
Connection physicalConnection = connHolder.getConnection();
try
physicalConnection.close();
catch (Exception ex)
LOG.warn("close connection error", ex);
connections[i] = null;
destroyCountUpdater.incrementAndGet(this);
poolingCount = 0;
unregisterMbean();
enable = false;
notEmpty.signalAll();
notEmptySignalCount++;
this.closed = true;
this.closeTimeMillis = System.currentTimeMillis();
disableException = new DataSourceDisableException();
for (Filter filter : filters)
filter.destroy();
finally
this.closing = false;
lock.unlock();
if (LOG.isInfoEnabled())
LOG.info("dataSource-" + this.getID() + " closed");
以上是关于[ Druid ] 源码拆解 —— 1. 初始化过程的全局概览的主要内容,如果未能解决你的问题,请参考以下文章
[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?
[ Druid ] 源码拆解 —— 2. 连接是如何创建的 ?
[ Druid ] 源码拆解 —— 3. 连接池到底是如何做到 收缩的 ?
Druid连接池源码解析(1)DruidDataSource
同宗同源——Redis线程模型源码拆解,以及和Netty的对比(下)
[Druid-1.2.9_preview_01源码系列]-4-DruidDataSource初始化时的神操作之自动解析DbType与驱动类