Mybatis 源码学习(13)-DataSource
Posted 凉茶方便面
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mybatis 源码学习(13)-DataSource相关的知识,希望对你有一定的参考价值。
历史文章:
Mybatis 源码学习(12)-资源加载
数据源模块位于 org.apache.ibatis.datasource 包内,其中的核心类包括:DataSourceFactory 和 DataSource。数据源模块是一系列代码,它符合抽象工厂的构建方式,首先它提供了 DataSourceFactory 接口和 DataSource 接口,分别代表抽象工厂的工厂接口和工厂产品实现,并分别提供 UnpooledDataSourceFactory 和 PooledDataSourceFactory 两个工厂实现,UnpooledDataSource 和 PooledDataSource 两个工厂产品的实现。
DataSourceFactory
DataSourceFactory 是数据源的模块的抽象工厂系列,它包含 DataSourceFactory接口以及 UnpooledDataSourceFactory 和 PooledDataSourceFactory 以及 JndiDataSourceFactory。DataSourceFactory 规定了抽象工厂需要实现的方法:
public interface DataSourceFactory
// 设置数据源的相关属性,内部一般具有解析 Properties 的方法
void setProperties(Properties props);
// 获取已配置好的数据源
DataSource getDataSource();
UnpooledDataSourceFactory 的构造方法中创建了 UnpooledDataSource,创建完成 UnpooledDataSource 后,需要调用 UnpooledDataSourceFactory.setProperties 方法为 UnpooledDataSource 提供连接数据库的必要配置。
// 为 UnpooledDataSource 设置必要的数据库连接配置
public void setProperties(Properties properties)
Properties driverProperties = new Properties();
// 获取 DataSource 对应的 MetaObject,方便后续通过字符串设置属性
MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
// 遍历所有的外部配置
for (Object key : properties.keySet())
String propertyName = (String) key;
// 属性名以 driver. 开头,则认为是 DataSource 的 deriver 配置项,记录到 driverProperties 中保存
if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX))
String value = properties.getProperty(propertyName);
driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
else if (metaDataSource.hasSetter(propertyName))
// 如果 DataSource 有对应的属性,则直接将值设置到对应字段上
String value = (String) properties.get(propertyName);
// 解析对应字段的值类型(根据 setter 方法的参数值类型,转化成 Integer、Long、Boolean)
Object convertedValue = convertValue(metaDataSource, propertyName, value);
metaDataSource.setValue(propertyName, convertedValue);
else
throw new DataSourceException(“… ");
if (driverProperties.size() > 0) // 设置 DataSource 的 driverProperties 属性
metaDataSource.setValue("driverProperties", driverProperties);
PooledDataSourceFactory 继承自 UnpooledDataSourceFactory,它仅覆盖了构造函数,在构造函数中创建 PooledDataSource。JndiDataSourceFactory 则直接使用 InitialContext 读取对应已经在容器中配置好的数据源并返回。
UnpooledDataSource
UnpooledDataSource 和 PooledDataSource 都是 javax.sql.DataSource 的实现类,它们内部会记录已经在 DriverManager 中注册过的 Driver,以及 Driver 的基本配置。对于 UnpooledDataSource 而言,每次获取 Connection 对象时,都会调用 DriverManager 去获取数据库连接。
UnpooledDataSource 内部记录了数据源相关的配置,并在实际获取 Connection 时将配置转交给 DriverManager。
private ClassLoader driverClassLoader; // 加载 Driver 的类加载器
private Properties driverProperties; // Driver 对应的相关数据库配置
// 缓存已注册过的所有 Driver
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();
private String driver; // 数据库驱动名称
private String url; // 数据库连接
private String username; // 数据库用户名
private String password; // 数据库密码
private Boolean autoCommit; // 是否自动提交事务
private Integer defaultTransactionIsolationLevel; // 默认事务隔离级别
需要说明的是 registeredDrivers,它记录的是所有在 DriverManager 中已注册的 Driver,另外如果当前加载的 Driver 没有在 DriverManager 中注册过时,在创建 Connection 时也会将新加载的 Driver 注册至 registeredDrivers 和 DriverManager。
这里需要简单说明下 DriverManager 通过 SPI 机制加载 Driver 的过程,详细的过程可以参考《源码分析- Java SPI 机制》。DriverManager 的加载过程简化来说分为三个阶段:DriverManager 静态代码块触发 META-INF/services/java.sql.Driver 文件的读取并加载文件内对应的 Driver 实现类;实现类的静态代码块向 DriverManager 注册自己;业务代码使用 DriverManager 遍历 Driver 获取数据库连接。
这里看下 DriverManager.registerDriver 方法:
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
public static synchronized void registerDriver(java.sql.Driver driver, DriverAction da) throws SQLException
if(driver != null)
registeredDrivers.addIfAbsent(new DriverInfo(driver, da)); // 将 Deriver 记录到已加载列表
else
throw new NullPointerException();
UnpooledDataSource 中的静态代码会将 DriverManager 中已加载的 Driver 复制一份到自己的 registeredDrivers 中。
static
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements())
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver); // 复制
UnpooledDataSource 有一系列 doGetConnection 方法,这些方法最终会调用 doGetConnection(Properties properties) 方法。doGetConnection 包括两个方法:initializeDriver 负责初始化 Driver 以及解决类加载器相关问题;configureConnection 负责完成数据库配置。
private Connection doGetConnection(Properties properties) throws SQLException
initializeDriver(); // 初始化数据库驱动
Connection connection = DriverManager.getConnection(url, properties); // 获取数据库连接
configureConnection(connection); // 配置数据库的 autoCommit 和隔离级别
return connection;
private synchronized void initializeDriver() throws SQLException
if (!registeredDrivers.containsKey(driver)) // 检测驱动是否已加载
Class<?> driverType;
try
if (driverClassLoader != null)
// 自定义 ClassLoader 加载对应的 Driver 类
driverType = Class.forName(driver, true, driverClassLoader);
else
driverType = Resources.classForName(driver);
Driver driverInstance = (Driver) driverType.newInstance(); // 创建 Driver 实例
// 通过引入 DriverProxy 代理,将创建的 Driver 实例注册至 DriverManager
DriverManager.registerDriver(new DriverProxy(driverInstance));
// 将驱动添加至 registeredDrivers 集合
registeredDrivers.put(driver, driverInstance);
catch (Exception e)
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
private void configureConnection(Connection conn) throws SQLException
if (autoCommit != null && autoCommit != conn.getAutoCommit())
conn.setAutoCommit(autoCommit); // 数值数据库事务是否自动提交
if (defaultTransactionIsolationLevel != null)
conn.setTransactionIsolation(defaultTransactionIsolationLevel); // 设置事务的隔离级别
PooledDataSource
PooledDataSource 是 Mybatis 对数据库连接池的简易实现,通过数据库连接池可以实现数据库连接的重用、提升数据库响应效率、防止数据库连接过多造成数据库假死、避免数据库连接泄露等问题。
数据库连接池的基本原理是:在连接池初始化时创建一部分连接,当需要时,直接返回空闲的连接,当业务使用完成时,则返还该连接,而不是关闭掉;当连接池达到上限或者空闲连接达到上限,则需要业务等待其他业务释放连接;如果是尚未达到 连接池上线,但是已达到空闲连接上限 ,则需要丢弃当前新建的连接。
数据库连接池的关键属性包括:数据库总连接上下限和空闲连接上下限。数据库总连接上限过大会造成数据库压力,过小则影响整体性能;空闲连接过大会造成资源浪费,过小会无法及时响应洪峰压力。
PooledDataSource 引入 UnpooledDataSource ,通过 UnpooledDataSource 完成数据源的创建。同时,PooledDataSource 不直接管理 java.sql.Connection 而是管理 Connection 的代理对象 PooledConnection,该对象实现了 JDK 的动态代理。PooledDataSource 的连接池维护在 PoolState 对象中,该对象内部维护了当前空闲的连接和当前使用的连接集合,以及会对数据库连接池做对应的统计工作。
PooledConnection
PooledConnection 的核心字段如下:
// 记录当前 PooledConnection 对象所在的 PooledDataSource。
// 该 PooledConnection 是从 PooledDataSource 中获取的,当调用 close 方法时,
// 需要把 PooledConnection 归还至 PooledDataSource 的连接池中
private final PooledDataSource dataSource;
private final Connection realConnection; // 真正的数据库连接
private final Connection proxyConnection; // 数据库连接的代理对象
private long checkoutTimestamp; // 从数据库连接池取出的时间戳
private long createdTimestamp; // 连接创建的时间戳
private long lastUsedTimestamp; // 最近被使用的时间戳
// 由数据库 url+用户名+密码计算的 hash 值,用于标识连接所在的连接池
private int connectionTypeCode;
// 用于标识当前连接是否有效,避免业务调用 close 方法后仍然使用该数据库连接
private boolean valid;
PooledConnection 有多个 get、set 属性的方法,它实现的 InvocationHandler 的 invoke 方法,该方法实现了代理对象的逻辑,它拦截 close 方法,并且检查当前 PooledConnection 是否可用。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
String methodName = method.getName();
// 如果调用了 close 方法,不能直接释放,而是需要将其放入到连接池中
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName))
dataSource.pushConnection(this);
return null;
else
try
if (!Object.class.equals(method.getDeclaringClass()))
checkConnection(); // 通过 valid 检查连接是否正常,否则抛出 SQLException
return method.invoke(realConnection, args); // 调用实际 Connection 对象的方法
catch (Throwable t)
throw ExceptionUtil.unwrapThrowable(t);
PoolState
PoolState 可以用来管理 PooledConnection 对象,其内部的 idleConnections 和 activeConnections 分别表示空闲连接和活动连接。PoolState 的内部字段定义如下:
// 空闲连接列表
protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
// 活动连接列表
protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
// 以下是统计字段
protected long requestCount = 0; // 请求连接池中数据库连接的次数
protected long accumulatedRequestTime = 0; // 获取连接的累计时间
// accumulatedCheckoutTime 是所有连接的 checkoutTime 的总和
// checkoutTime 表示连接被从连接池中取出,到连接被归还的总时长
protected long accumulatedCheckoutTime = 0;
// 长时间未将连接归还至连接池的连接数量
protected long claimedOverdueConnectionCount = 0;
// 累计超时未归还连接池的连接总使用时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
protected long accumulatedWaitTime = 0; // 累计等待时间(连接数不够时会等待)
protected long hadToWaitCount = 0; // 等待次数
protected long badConnectionCount = 0; // 请求时,连接无效的次数
PooledDataSource
PooledDataSource 中真正负责创建数据库连接的对象是 UnpooledDataSource,它在 PooledDataSource 的构造器中被创建。PooledDataSource 内的核心字段如下:
// 通过 PoolState 管理连接池中的连接,以及记录连接池状态
private final PoolState state = new PoolState(this);
// 用于实际生成数据库连接,在构造器内初始化该字段
private final UnpooledDataSource dataSource;
// 以下是可选的配置字段
// 最大活动连接数量
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数量
protected int poolMaximumIdleConnections = 5;
// 最大 checkoutTime(从连接池中被取出到归还的时间)
protected int poolMaximumCheckoutTime = 20000;
// 无法获取连接时,现成的等待时间
protected int poolTimeToWait = 20000;
// 连接池最大允许无法获取连接的次数,默认如果获取空闲连接的个数
// + poolMaximumLocalBadConnectionTolerance次还无法获取连接时,
// 认为该连接池无法获得正常的连接。其实就是多次获得连接,但是这
// 些连接均无法正常工作时,认为该连接池无法正常提供服务
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 检测数据库连接是否可用时,向数据库发送的测试 SQL
protected String poolPingQuery = "NO PING QUERY SET";
// 是否开启发送测试 SQL 进行探活的功能
protected boolean poolPingEnabled;
// 开启测试 SQL 探活功能时,连接在 poolPingConnectionsNotUsedFor 毫秒内未被
// 使用时,发送一条测试 SQL检测连接是否正常
protected int poolPingConnectionsNotUsedFor;
// 根据 URL + 用户名 + 密码拼接生成的 hash值,用于标识当前连接池所对应的数据库连接
// 该值在构造函数内初始化
private int expectedConnectionTypeCode;
PooledDataSource.getConnection 有多个重载实现,但是它们最终调用的是PooledDataSource.popConnection 方法获取 PooledConnection,然后通过 PooledDataSource.getProxyConnection 获取 Connection 的代理对象:PooledConnection。popConnection 的逻辑较为复杂,这里先列出其执行流程。
PooledConnection 的具体执行逻辑如下(已去除打印日志的逻辑):
private PooledConnection popConnection(String username, String password) throws SQLException
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis(); // 当前时间
int localBadConnectionCount = 0; // 获取无法使用的连接的次数
while (conn == null)
synchronized (state) // 同步,每次仅允许单个线程进入获取连接
if (!state.idleConnections.isEmpty()) // 当前连接池存在可用的空闲连接
conn = state.idleConnections.remove(0); // 获取第一个连接,该连接是最早被创建的连接
else // 当前数据库连接池中没有空闲连接
// 检查当前活动连接数是否超出最大值,没有超出则允许直接创建新的数据库连接
if (state.activeConnections.size() < poolMaximumActiveConnections)
// 创建新的数据库连接,并包装为 PooledConnection
conn = new PooledConnection(dataSource.getConnection(), this);
else // 当前数据库连接池没有空闲链接,并且活动连接总数超出最大值,无法创建新连接
// 检测头部最早被创建的活动连接的 checkoutTime
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 该连接超过最大可被取出并使用的时长,可回收该连接
if (longestCheckoutTime > poolMaximumCheckoutTime)
// 对超时连接的信息进行统计
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
// 将超时连接从活动连接中取出
state.activeConnections.remove(oldestActiveConnection);
// 如果超时连接尚在事务中,则需要回滚事务
if (!oldestActiveConnection.getRealConnection().getAutoCommit())
try
// 回滚超时连接上的事务
oldestActiveConnection.getRealConnection().rollback();
catch (SQLException e)
// … 日志
// 创建新的 PooledConnection,并复用旧的 Connection 对象,
// 并重置 PooledConnection 上的统计信息
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
// 将旧的 PooledConnection 设置为无效,避免旧连接被业务继续使用
oldestActiveConnection.invalidate();
else
// 无空闲连接、无法创建新连接、无超时连接,只能阻塞等待
try
if (!countedWait) // 是否已经在等待中
state.hadToWaitCount++; // 统计等待的次数
countedWait = true;
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait); // 阻塞线程,进行等待
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
catch (InterruptedException e)
break;
// 经过以上流程已经获取到 PooledConnection
if (conn != null)
// 当前 PooledConnection 是否仍然有效
if (conn.isValid())
if (!conn.getRealConnection().getAutoCommit()) // 连接上依然有活动的事务,则回滚它
conn.getRealConnection().rollback();
// 更新 PooledConnection 相关属性,设置 ConnectionTypeCode、CheckoutTimestamp、LastUsedTimestamp
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
// 统计状态信息
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
else // 如果获取到的 PooledConnection 已经被标记为无效连接
state.badConnectionCount++; // 增加统计信息
localBadConnectionCount++;
conn = null; // 重置 PooledConnection 对象
// 如果多次尝试获取 PooledConnection,但是仍然无法得到正常状态的 PooledConnection,则认为连接池存在异常
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance))
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
// 目前看来,仅在等待过程中线程被中断,会经过这里,此时还未获取连接,则抛出异常
if (conn == null)
throw new SQLException(“…”);
return conn;
经过以上过程,业务代码能够从数据库连接池中获取可用的数据库连接,并能够使用这些连接与数据库进行交互,当数据库连接被使用完毕时,应该被归还至数据库连接池。正常数据库连接的归还逻辑是调用 Connection.close 方法,但是 PooledDataSource 中返回的是 PooledConnection,调用 Connection 接口的 close 方法实际上会调用 PooledConnection 的 close 方法。由于 PooledConnection 本身是 InvocationHandler 代理对象,因此是所有的方法都会经过它的 invoke 方法,该方法如前所说,会拦截 close 方法,并调用 PooledDataSource.pushConnection 方法将连接归还至连接池。
PooledDataSource.pushConnection 方法的具体执行逻辑如下(已去除打印日志的逻辑):
protected void pushConnection(PooledConnection conn) mybatis专题-----mybatis源码学习