Druid连接池源码解析(8)DruidPooledStatement
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Druid连接池源码解析(8)DruidPooledStatement相关的知识,希望对你有一定的参考价值。
参考技术A 本来以为pool包收尾了,扫了一下发现还有statement,这块比较简单,是对JDBC的Statement进行了一些封装,实现了与连接池对应的功能,看下类图:DruidPooledStatement 中最重要的就是execute开头的一系列方法了,是执行SQL的本体,都是对JDBC的Statement的增强,主要多做了以下:
DruidPooledPreparedStatement实现了JDBC的PreparedStatement,是执行预编译的SQL用,跟Connection相同,引入了PreparedStatementHolder这样一个Holder类,同样定义了一些统计属性,其中有个属性pooling,是否池化,引出了PreparedStatementPool,里面用LinkedHashMap实现了LRU,一个cache类用来缓存预编译过的statement;
cache的key定义了一个类,PreparedStatementKey,重写了hashcode和equals方法,很有意思地参考了String的hashcode,用了31这个质数作进制来计算hash,且不仅仅是sql语句相同,需要其他属性都相同才算相同,贴一下源码:
CallableStatement是用来执行函数或者存储过程的,在当下的大环境下,用得真的很少就略过了吧。
druid在JDBC的statement基础上,封装了一些类来扩展自己的属性和功能,一般在工程环境下,基本就用AOP或者装饰器模式来实现(感觉这个Holder有点像装饰器,又有点像单例的Holder)的,写框架的时候,这种方式还是很值得借鉴的,很内聚、高效,也没有额外的引用
cache的使用很巧妙,但是在刷文章的时候看到过一个问题,虽然在后续版本中修复了,但还是分享一下:
探究Druid连接池“违反协议”异常
这个问题抛的是SQL异常,问题却出在缓存上,对与缓的状态和刷新策略,有了新的认识
Druid核心源码解析--DruidDataSource
配置读取
druid连接池支持的所有连接参数可在类com.alibaba.druid.pool.DruidDataSourceFactory
中查看。
配置读取代码:
public void configFromPropety(Properties properties)
//这方法太长,自己看源码去吧,就是读读属性。。。。
整体代码比较简单,就是把配置内容,读取到dataSource。
连接池初始化
首先是简单的判断,加锁:
if (inited)
//已经被初始化好了,直接return
return;
// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();
/**控制创建移除连接的锁,并且通过条件去控制一个连接的生成消费**/
// public DruidAbstractDataSource(boolean lockFair)
// lock = new ReentrantLock(lockFair);
//
// notEmpty = lock.newCondition();
// empty = lock.newCondition();
//
final ReentrantLock lock = this.lock;
try
lock.lockInterruptibly();
catch (InterruptedException e)
throw new SQLException("interrupt", e);
之后会更新一些JMX的监控指标:
//一些jmx监控指标
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
druid的监控指标都是通过jmx实现的。
解析连接串:
if (this.jdbcUrl != null)
//解析连接串
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
initFromWrapDriverUrl
方法,除了从jdbc url中解析出连接和驱动信息,后面还把filters的名字,解析成了对应的filter类。
private void initFromWrapDriverUrl() throws SQLException
if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX))
return;
DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
this.driverClass = config.getRawDriverClassName();
LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");
this.jdbcUrl = config.getRawUrl();
if (this.name == null)
this.name = config.getName();
for (Filter filter : config.getFilters())
addFilter(filter);
之后在init方法里面,会进行filters的初始化:
//初始化filter 属性
for (Filter filter : filters)
filter.init(this);
之后解析数据库类型:
if (this.dbTypeName == null || this.dbTypeName.length() == 0)
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
注意枚举值: com.alibaba.druid.DbType
,这个里面包含了目前durid连接池支持的所有数据源 类型,另外,druid还额外提供了一些驱动类,例如:
elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver
clickhouse还提供了负载均衡的驱动类:
com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver
。
在回到init方法,之后是一堆参数解析,不再说,跳过了。
之后是通过SPI加载自定义的filter:
private void initFromSPIServiceLoader()
if (loadSpifilterSkip)
return;
if (autoFilters == null)
List<Filter> filters = new ArrayList<Filter>();
ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
for (Filter filter : autoFilterLoader)
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value())
filters.add(filter);
autoFilters = filters;
for (Filter filter : autoFilters)
if (LOG.isInfoEnabled())
LOG.info("load filter from spi :" + filter.getClass().getName());
addFilter(filter);
注意自定义的filter,要使用com.alibaba.druid.filter.AutoLoad
。
解析驱动:
protected void resolveDriver() throws SQLException
if (this.driver == null)
if (this.driverClass == null || this.driverClass.isEmpty())
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
if (MockDriver.class.getName().equals(driverClass))
driver = MockDriver.instance;
else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass))
Properties info = new Properties();
info.put("user", username);
info.put("password", password);
info.putAll(connectProperties);
driver = new BalancedClickhouseDriver(jdbcUrl, info);
else
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0))
throw new SQLException("url not set");
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
else
if (this.driverClass == null)
this.driverClass = driver.getClass().getName();
其中durid自己的mock驱动和clickhouse的负载均衡的驱动,特殊判断了下,其他走的都是class forname.
之后是exception sorter和checker的一些东西,跟主线剧情关系不大,skip.
之后是一些初始化JdbcDataSourceStat
,没啥东西。
之后是核心:
connections = new DruidConnectionHolder[maxActive]; //连接数组
evictConnections = new DruidConnectionHolder[maxActive]; //销毁的连接数组
keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活跃可用的数组
dataSource的连接,都被包装在类DruidConnectionHolder
中,之后是一个同步去初始化连接还是异步去初始化的连接,总之,是去初始化 连接的过程:
if (createScheduler != null && asyncInit)
for (int i = 0; i < initialSize; ++i)
submitCreateTask(true);
else if (!asyncInit)
// init connections
while (poolingCount < initialSize)
try
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
catch (SQLException ex)
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow)
connectError = ex;
break;
else
Thread.sleep(3000);
if (poolingCount > 0)
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
初始化的连接个数为连接串里面配置的initialSize
.
核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection()
,在这方法里面,会拿用户名密码,之后执行真正的获取connection:
public Connection createPhysicalConnection(String url, Properties info) throws SQLException
Connection conn;
if (getProxyFilters().size() == 0)
conn = getDriver().connect(url, info);
else
conn = new FilterChainImpl(this).connection_connect(info);
createCountUpdater.incrementAndGet(this);
return conn;
注意,如果配置了filters,则所有操作,都会在操作前执行filter处理链。
public ConnectionProxy connection_connect(Properties info) throws SQLException
if (this.pos < filterSize)
return nextFilter()
.connection_connect(this, info);
Driver driver = dataSource.getRawDriver();
String url = dataSource.getRawJdbcUrl();
Connection nativeConnection = driver.connect(url, info);
if (nativeConnection == null)
return null;
return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
再回到主流程init方法,connections
数组初始化完成之后,
开启额外线程:
createAndLogThread(); //打印连接信息
createAndStartCreatorThread(); //创建连接线程
createAndStartDestroyThread(); //销毁连接线程
先看注释,具体里面的内容后面单独拉出来讲。
之后:
initedLatch.await(); //初始化 latch -1
init = true; //标记已经初始化完成
initedTime = new Date(); //时间
registerMbean(); //为datasource 注册jmx监控指标
最后的最后,如果配置了keepAlive:
if (keepAlive)
// async fill to minIdle
if (createScheduler != null)
for (int i = 0; i < minIdle; ++i)
submitCreateTask(true);
else
this.emptySignal();
这时候,会根据配置的活跃连接数minIdle
,去给datasource的连接,做个保持活跃连接个数,具体后面再说。
连接池使用的核心逻辑
首先,使用数组作为连接的容器,对于真实连接的加入和移除,使用lock就行同步,另外,在加入和移除连接时候,对比生产消费模型,通过lock上的条件,来通知是否可以获取或者加入连接。
public DruidAbstractDataSource(boolean lockFair)
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition(); //非空,有连接
empty = lock.newCondition(); //空的
另外,默认的fairlock为false
public DruidDataSource()
this(false);
public DruidDataSource(boolean fairLock)
super(fairLock);
configFromPropety(System.getProperties());
创建连接
在线程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread
中:
if (emptyWait)
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
)
empty.await();
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive)
empty.await();
continue;
必须存在线程等待获取连接时候,才能创建连接,并且要保持总的连接数,不能超过配置的最大连接。
创建完连接之后,执行notEmpty.signalAll();
通知消费者。
获取连接
外层代码:
@Override
public DruidPooledConnection getConnection() throws SQLException
return getConnection(maxWait);
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException
init();
if (filters.size() > 0)
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
else
return getConnectionDirect(maxWaitMillis);
忽略掉filter chain,其实最后执行的还是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect
:
方法内部:
poolableConnection = getConnectionInternal(maxWaitMillis);
- 1 , 连接不足,需要直接去创建新的,跟我们初始化一样
- 2,从connections里面拿
if (maxWait > 0)
holder = pollLast(nanos);
else
holder = takeLast();
其中,maxWait默认为-1,配置在init里面:
String property = properties.getProperty("druid.maxWait");
if (property != null && property.length() > 0)
try
int value = Integer.parseInt(property);
this.setMaxWait(value);
catch (NumberFormatException e)
LOG.error("illegal property 'druid.maxWait'", e);
这个用于配置拿连接时候,是否在这个时间上进行等待,默认是否,即一直等到拿到连接为止。
直接看下阻塞拿的过程:
DruidConnectionHolder takeLast() throws InterruptedException, SQLException
try
//没连接了
while (poolingCount == 0)
//暗示下创建线程没连接了
emptySignal(); // send signal to CreateThread create connection
if (failFast && isFailContinuous())
throw new DataSourceNotAvailableException(createError);
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak)
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
try
//傻等着创建或者回收,能给整出来点儿连接
notEmpty.await(); // signal by recycle or creator
finally
notEmptyWaitThreadCount--;
notEmptyWaitCount++;
if (!enable)
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null)
throw disableException;
throw new DataSourceDisableException();
catch (InterruptedException ie)
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
//拿数组的最后一个连接
decrementPoolingCount();
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
连接回收
protected void createAndStartDestroyThread()
destroyTask = new DestroyTask();
//自定义配置销毁 ,适用于连接数非常多的 情况
if (destroyScheduler != null)
long period = timeBetweenEvictionRunsMillis;
if (period <= 0)
period = 1000;
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
//单线程销毁
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
实际的销毁:
public class DestroyTask implements Runnable
public DestroyTask()
@Override
public void run()
shrink(true, keepAlive);
if (isRemoveAbandoned())
removeAbandoned();
最终 执行的还是 shrink
方法。
public void shrink(boolean checkTime, boolean keepAlive)
try
lock.lockInterruptibly();
catch (InterruptedException e)
return;
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try
if (!inited)
return;
final int checkCount = poolingCount - minIdle; //需要检测连接的数量
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) //检测目前connections数组中的连接
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))
keepAliveConnections[keepAliveCount++] = connection;
continue;
if (checkTime)
//是否设置了物理连接的超时时间phyTimoutMills。假如设置了该时间,
// 判断连接时间存活时间是否已经超过phyTimeoutMills,是则放入evictConnections中
if (phyTimeoutMillis > 0)
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis)
evictConnections[evictCount++] = connection;
continue;
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;//获取连接空闲时间
//如果某条连接空闲时间小于minEvictableIdleTimeMillis,则不用继续检查剩下的连接了
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
)
break;
if (idleMillis >= minEvictableIdleTimeMillis)
// check checkTime is silly code
//检测检查了几个连接了
if (checkTime && i < checkCount)
//超时了
evictConnections[evictCount++] = connection;
continue;
else if (idleMillis > maxEvictableIdleTimeMillis)
//超时了
evictConnections[evictCount++] = connection;
continue;
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis)
//配置了keepAlive,并且在存活时间内,放到keepAlive数组
keepAliveConnections[keepAliveCount++] = connection;
else
//不需要检查时间的,直接移除
if (i < checkCount)
evictConnections[evictCount++] = connection;
else
break;
int removeCount = evictCount + keepAliveCount; //移除了几个
//由于使用connections连接时候,都是取后面的,后面 的是最新的连接,只考虑前面过期就行,所以只需要挪动前面的连接
if (removeCount > 0)
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle)
//不够核心的活跃连接时候,需要去创建啦
needFill = true;
finally
lock.unlock();
if (evictCount > 0)
for (int i = 0; i < evictCount; ++i)
//销毁连接
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
Arrays.fill(evictConnections, null);
if (keepAliveCount > 0)
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i)
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try
this.validateConnection(connection);
validate = true;
catch (Throwable error)
if (LOG.isDebugEnabled())
LOG.debug("keepAliveErr", error);
// skip
boolean discard = !validate; //没通过validate
if (validate)
//通过keep alive检查,更新时间
holer.lastKeepTimeMillis = System.currentTimeMillis();
//这里还会尝试放回connections数组
boolean putOk = put(holer, 0L, true);
if (!putOk)
//没放入,标记要丢弃了
discard = true;
if (discard)
try
connection.close();
catch (Exception e)
// skip
lock.lock();
try
discardCount++;
if (activeCount + poolingCount <= minIdle)
//发信号让创建线程去创建
emptySignal();
finally
lock.unlock();
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
if (needFill)
//又要去创建了
lock.lock();
try
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i)
emptySignal();
finally
lock.unlock();
else if (onFatalError || fatalErrorIncrement > 0)
lock.lock();
try
emptySignal();
finally
lock.unlock();
工具数组evictConnections
,keepAliveConnections
用完即被置空,老工具人了。
一波操作下来,完成了对connections数组的大清洗。
小结
- 只写了核心逻辑,很多validate,checker,filter省略了。
- druid连接池源码里面还有很多好用的工具,比如数据库驱动工具,jdbc工具,解析SQL的语法树,ibatis的支持,wall过滤,多数据源…
- 最新的代码我看还有支持配套ZK的高可用方案,用到的话后期我会继续更新源码解析。
以上是关于Druid连接池源码解析(8)DruidPooledStatement的主要内容,如果未能解决你的问题,请参考以下文章