Mybatis 源码学习(13)-DataSource

Posted 凉茶方便面

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mybatis 源码学习(13)-DataSource相关的知识,希望对你有一定的参考价值。

历史文章:
Mybatis 源码学习(12)-资源加载


数据源模块位于 org.apache.ibatis.datasource 包内,其中的核心类包括:DataSourceFactory 和 DataSource。数据源模块是一系列代码,它符合抽象工厂的构建方式,首先它提供了 DataSourceFactory 接口和 DataSource 接口,分别代表抽象工厂的工厂接口和工厂产品实现,并分别提供 UnpooledDataSourceFactory 和 PooledDataSourceFactory 两个工厂实现,UnpooledDataSource 和 PooledDataSource 两个工厂产品的实现。

DataSourceFactory

DataSourceFactory 是数据源的模块的抽象工厂系列,它包含 DataSourceFactory接口以及 UnpooledDataSourceFactory 和 PooledDataSourceFactory 以及 JndiDataSourceFactory。DataSourceFactory 规定了抽象工厂需要实现的方法:

public interface DataSourceFactory 

  // 设置数据源的相关属性,内部一般具有解析 Properties 的方法
  void setProperties(Properties props);

  // 获取已配置好的数据源
  DataSource getDataSource();

UnpooledDataSourceFactory 的构造方法中创建了 UnpooledDataSource,创建完成 UnpooledDataSource 后,需要调用 UnpooledDataSourceFactory.setProperties 方法为 UnpooledDataSource 提供连接数据库的必要配置。

// 为 UnpooledDataSource 设置必要的数据库连接配置
public void setProperties(Properties properties) 
  Properties driverProperties = new Properties();
  // 获取 DataSource 对应的 MetaObject,方便后续通过字符串设置属性
  MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
  // 遍历所有的外部配置
  for (Object key : properties.keySet()) 
    String propertyName = (String) key;
    // 属性名以 driver. 开头,则认为是 DataSource 的 deriver 配置项,记录到 driverProperties 中保存
    if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) 
      String value = properties.getProperty(propertyName);
      driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
     else if (metaDataSource.hasSetter(propertyName)) 
      // 如果 DataSource 有对应的属性,则直接将值设置到对应字段上
      String value = (String) properties.get(propertyName);
      // 解析对应字段的值类型(根据 setter 方法的参数值类型,转化成 Integer、Long、Boolean)
      Object convertedValue = convertValue(metaDataSource, propertyName, value);
      metaDataSource.setValue(propertyName, convertedValue);
     else 
      throw new DataSourceException(“… ");
    
  
  if (driverProperties.size() > 0)  // 设置 DataSource 的 driverProperties 属性
    metaDataSource.setValue("driverProperties", driverProperties);
  

PooledDataSourceFactory 继承自 UnpooledDataSourceFactory,它仅覆盖了构造函数,在构造函数中创建 PooledDataSource。JndiDataSourceFactory 则直接使用 InitialContext 读取对应已经在容器中配置好的数据源并返回。

UnpooledDataSource

UnpooledDataSource 和 PooledDataSource 都是 javax.sql.DataSource 的实现类,它们内部会记录已经在 DriverManager 中注册过的 Driver,以及 Driver 的基本配置。对于 UnpooledDataSource 而言,每次获取 Connection 对象时,都会调用 DriverManager 去获取数据库连接。
UnpooledDataSource 内部记录了数据源相关的配置,并在实际获取 Connection 时将配置转交给 DriverManager。

private ClassLoader driverClassLoader; // 加载 Driver 的类加载器
private Properties driverProperties; // Driver 对应的相关数据库配置

// 缓存已注册过的所有 Driver
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

private String driver; // 数据库驱动名称
private String url; // 数据库连接
private String username; // 数据库用户名
private String password; // 数据库密码

private Boolean autoCommit; // 是否自动提交事务
private Integer defaultTransactionIsolationLevel; // 默认事务隔离级别

需要说明的是 registeredDrivers,它记录的是所有在 DriverManager 中已注册的 Driver,另外如果当前加载的 Driver 没有在 DriverManager 中注册过时,在创建 Connection 时也会将新加载的 Driver 注册至 registeredDrivers 和 DriverManager。

这里需要简单说明下 DriverManager 通过 SPI 机制加载 Driver 的过程,详细的过程可以参考《源码分析- Java SPI 机制》。DriverManager 的加载过程简化来说分为三个阶段:DriverManager 静态代码块触发 META-INF/services/java.sql.Driver 文件的读取并加载文件内对应的 Driver 实现类;实现类的静态代码块向 DriverManager 注册自己;业务代码使用 DriverManager 遍历 Driver 获取数据库连接。

这里看下 DriverManager.registerDriver 方法:

private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

public static synchronized void registerDriver(java.sql.Driver driver, DriverAction da) throws SQLException 

    if(driver != null) 
        registeredDrivers.addIfAbsent(new DriverInfo(driver, da)); // 将 Deriver 记录到已加载列表
     else 
        throw new NullPointerException();
    

UnpooledDataSource 中的静态代码会将 DriverManager 中已加载的 Driver 复制一份到自己的 registeredDrivers 中。

static 
  Enumeration<Driver> drivers = DriverManager.getDrivers();
  while (drivers.hasMoreElements()) 
    Driver driver = drivers.nextElement();
    registeredDrivers.put(driver.getClass().getName(), driver); // 复制
  

UnpooledDataSource 有一系列 doGetConnection 方法,这些方法最终会调用 doGetConnection(Properties properties) 方法。doGetConnection 包括两个方法:initializeDriver 负责初始化 Driver 以及解决类加载器相关问题;configureConnection 负责完成数据库配置。

private Connection doGetConnection(Properties properties) throws SQLException 
  initializeDriver(); // 初始化数据库驱动
  Connection connection = DriverManager.getConnection(url, properties); // 获取数据库连接
  configureConnection(connection); // 配置数据库的 autoCommit 和隔离级别
  return connection;


private synchronized void initializeDriver() throws SQLException 
  if (!registeredDrivers.containsKey(driver))  // 检测驱动是否已加载
    Class<?> driverType;
    try 
      if (driverClassLoader != null) 
        // 自定义 ClassLoader 加载对应的 Driver 类
        driverType = Class.forName(driver, true, driverClassLoader);
       else 
        driverType = Resources.classForName(driver);
      
      Driver driverInstance = (Driver) driverType.newInstance(); // 创建 Driver 实例
      // 通过引入 DriverProxy 代理,将创建的 Driver 实例注册至 DriverManager
      DriverManager.registerDriver(new DriverProxy(driverInstance));
      // 将驱动添加至 registeredDrivers 集合
      registeredDrivers.put(driver, driverInstance);
     catch (Exception e) 
      throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
    
  


private void configureConnection(Connection conn) throws SQLException 
  if (autoCommit != null && autoCommit != conn.getAutoCommit()) 
    conn.setAutoCommit(autoCommit); // 数值数据库事务是否自动提交
  
  if (defaultTransactionIsolationLevel != null) 
    conn.setTransactionIsolation(defaultTransactionIsolationLevel); // 设置事务的隔离级别
  

PooledDataSource

PooledDataSource 是 Mybatis 对数据库连接池的简易实现,通过数据库连接池可以实现数据库连接的重用、提升数据库响应效率、防止数据库连接过多造成数据库假死、避免数据库连接泄露等问题。

数据库连接池的基本原理是:在连接池初始化时创建一部分连接,当需要时,直接返回空闲的连接,当业务使用完成时,则返还该连接,而不是关闭掉;当连接池达到上限或者空闲连接达到上限,则需要业务等待其他业务释放连接;如果是尚未达到 连接池上线,但是已达到空闲连接上限 ,则需要丢弃当前新建的连接。

数据库连接池的关键属性包括:数据库总连接上下限和空闲连接上下限。数据库总连接上限过大会造成数据库压力,过小则影响整体性能;空闲连接过大会造成资源浪费,过小会无法及时响应洪峰压力。

PooledDataSource 引入 UnpooledDataSource ,通过 UnpooledDataSource 完成数据源的创建。同时,PooledDataSource 不直接管理 java.sql.Connection 而是管理 Connection 的代理对象 PooledConnection,该对象实现了 JDK 的动态代理。PooledDataSource 的连接池维护在 PoolState 对象中,该对象内部维护了当前空闲的连接和当前使用的连接集合,以及会对数据库连接池做对应的统计工作。

PooledConnection

PooledConnection 的核心字段如下:

// 记录当前 PooledConnection 对象所在的 PooledDataSource。
// 该 PooledConnection 是从 PooledDataSource 中获取的,当调用 close 方法时,
// 需要把 PooledConnection 归还至 PooledDataSource 的连接池中
private final PooledDataSource dataSource;
private final Connection realConnection; // 真正的数据库连接
private final Connection proxyConnection; // 数据库连接的代理对象
private long checkoutTimestamp; // 从数据库连接池取出的时间戳
private long createdTimestamp; // 连接创建的时间戳
private long lastUsedTimestamp; // 最近被使用的时间戳

// 由数据库 url+用户名+密码计算的 hash 值,用于标识连接所在的连接池
private int connectionTypeCode;
// 用于标识当前连接是否有效,避免业务调用 close 方法后仍然使用该数据库连接
private boolean valid;

PooledConnection 有多个 get、set 属性的方法,它实现的 InvocationHandler 的 invoke 方法,该方法实现了代理对象的逻辑,它拦截 close 方法,并且检查当前 PooledConnection 是否可用。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
  String methodName = method.getName();
  // 如果调用了 close 方法,不能直接释放,而是需要将其放入到连接池中
  if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) 
    dataSource.pushConnection(this);
    return null;
   else 
    try 
      if (!Object.class.equals(method.getDeclaringClass())) 
        checkConnection(); // 通过 valid 检查连接是否正常,否则抛出 SQLException
      
      return method.invoke(realConnection, args); // 调用实际 Connection 对象的方法
     catch (Throwable t) 
      throw ExceptionUtil.unwrapThrowable(t);
    
  

PoolState

PoolState 可以用来管理 PooledConnection 对象,其内部的 idleConnections 和 activeConnections 分别表示空闲连接和活动连接。PoolState 的内部字段定义如下:

// 空闲连接列表
protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
// 活动连接列表
protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();

// 以下是统计字段
protected long requestCount = 0; // 请求连接池中数据库连接的次数
protected long accumulatedRequestTime = 0; // 获取连接的累计时间
// accumulatedCheckoutTime 是所有连接的 checkoutTime 的总和
// checkoutTime 表示连接被从连接池中取出,到连接被归还的总时长
protected long accumulatedCheckoutTime = 0;
// 长时间未将连接归还至连接池的连接数量
protected long claimedOverdueConnectionCount = 0;
// 累计超时未归还连接池的连接总使用时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
protected long accumulatedWaitTime = 0; // 累计等待时间(连接数不够时会等待)
protected long hadToWaitCount = 0; // 等待次数
protected long badConnectionCount = 0; // 请求时,连接无效的次数

PooledDataSource

PooledDataSource 中真正负责创建数据库连接的对象是 UnpooledDataSource,它在 PooledDataSource 的构造器中被创建。PooledDataSource 内的核心字段如下:

// 通过 PoolState 管理连接池中的连接,以及记录连接池状态
private final PoolState state = new PoolState(this);

// 用于实际生成数据库连接,在构造器内初始化该字段
private final UnpooledDataSource dataSource;

// 以下是可选的配置字段
// 最大活动连接数量
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数量
protected int poolMaximumIdleConnections = 5;
// 最大 checkoutTime(从连接池中被取出到归还的时间)
protected int poolMaximumCheckoutTime = 20000;
// 无法获取连接时,现成的等待时间
protected int poolTimeToWait = 20000;
// 连接池最大允许无法获取连接的次数,默认如果获取空闲连接的个数
// + poolMaximumLocalBadConnectionTolerance次还无法获取连接时,
// 认为该连接池无法获得正常的连接。其实就是多次获得连接,但是这
// 些连接均无法正常工作时,认为该连接池无法正常提供服务
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 检测数据库连接是否可用时,向数据库发送的测试 SQL
protected String poolPingQuery = "NO PING QUERY SET";
// 是否开启发送测试 SQL 进行探活的功能
protected boolean poolPingEnabled;
// 开启测试 SQL 探活功能时,连接在 poolPingConnectionsNotUsedFor 毫秒内未被
// 使用时,发送一条测试 SQL检测连接是否正常
protected int poolPingConnectionsNotUsedFor;

// 根据 URL + 用户名 + 密码拼接生成的 hash值,用于标识当前连接池所对应的数据库连接
// 该值在构造函数内初始化
private int expectedConnectionTypeCode;

PooledDataSource.getConnection 有多个重载实现,但是它们最终调用的是PooledDataSource.popConnection 方法获取 PooledConnection,然后通过 PooledDataSource.getProxyConnection 获取 Connection 的代理对象:PooledConnection。popConnection 的逻辑较为复杂,这里先列出其执行流程。

PooledConnection 的具体执行逻辑如下(已去除打印日志的逻辑):

private PooledConnection popConnection(String username, String password) throws SQLException 
  boolean countedWait = false;
  PooledConnection conn = null;
  long t = System.currentTimeMillis(); // 当前时间
  int localBadConnectionCount = 0; // 获取无法使用的连接的次数

  while (conn == null) 
    synchronized (state)  // 同步,每次仅允许单个线程进入获取连接
      if (!state.idleConnections.isEmpty())  // 当前连接池存在可用的空闲连接
        conn = state.idleConnections.remove(0); // 获取第一个连接,该连接是最早被创建的连接
       else  // 当前数据库连接池中没有空闲连接
        // 检查当前活动连接数是否超出最大值,没有超出则允许直接创建新的数据库连接
        if (state.activeConnections.size() < poolMaximumActiveConnections) 
          // 创建新的数据库连接,并包装为 PooledConnection
          conn = new PooledConnection(dataSource.getConnection(), this);
         else  // 当前数据库连接池没有空闲链接,并且活动连接总数超出最大值,无法创建新连接
          // 检测头部最早被创建的活动连接的 checkoutTime
          PooledConnection oldestActiveConnection = state.activeConnections.get(0);
          long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
          // 该连接超过最大可被取出并使用的时长,可回收该连接
          if (longestCheckoutTime > poolMaximumCheckoutTime) 
            // 对超时连接的信息进行统计
            state.claimedOverdueConnectionCount++;
            state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
            state.accumulatedCheckoutTime += longestCheckoutTime;
            // 将超时连接从活动连接中取出
            state.activeConnections.remove(oldestActiveConnection);
            // 如果超时连接尚在事务中,则需要回滚事务
            if (!oldestActiveConnection.getRealConnection().getAutoCommit()) 
              try 
                // 回滚超时连接上的事务
                oldestActiveConnection.getRealConnection().rollback();
               catch (SQLException e) 
                // … 日志
                
            
            // 创建新的 PooledConnection,并复用旧的 Connection 对象,
            // 并重置 PooledConnection 上的统计信息
            conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
            conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
            conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
            // 将旧的 PooledConnection 设置为无效,避免旧连接被业务继续使用
            oldestActiveConnection.invalidate();
           else 
            // 无空闲连接、无法创建新连接、无超时连接,只能阻塞等待
            try 
              if (!countedWait)  // 是否已经在等待中
                state.hadToWaitCount++; // 统计等待的次数
                countedWait = true;
              
              long wt = System.currentTimeMillis();
              state.wait(poolTimeToWait); // 阻塞线程,进行等待
              state.accumulatedWaitTime += System.currentTimeMillis() - wt;
             catch (InterruptedException e) 
              break;
            
          
        
      
      // 经过以上流程已经获取到 PooledConnection
      if (conn != null) 
        // 当前 PooledConnection 是否仍然有效
        if (conn.isValid()) 
          if (!conn.getRealConnection().getAutoCommit())  // 连接上依然有活动的事务,则回滚它
            conn.getRealConnection().rollback();
          
          // 更新 PooledConnection 相关属性,设置 ConnectionTypeCode、CheckoutTimestamp、LastUsedTimestamp
          conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
          conn.setCheckoutTimestamp(System.currentTimeMillis());
          conn.setLastUsedTimestamp(System.currentTimeMillis());
          // 统计状态信息
          state.activeConnections.add(conn);
          state.requestCount++;
          state.accumulatedRequestTime += System.currentTimeMillis() - t;
         else  // 如果获取到的 PooledConnection 已经被标记为无效连接
          state.badConnectionCount++; // 增加统计信息
          localBadConnectionCount++;
          conn = null; // 重置 PooledConnection 对象
          // 如果多次尝试获取 PooledConnection,但是仍然无法得到正常状态的 PooledConnection,则认为连接池存在异常
          if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) 
            throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
          
        
      
    
  

  // 目前看来,仅在等待过程中线程被中断,会经过这里,此时还未获取连接,则抛出异常
  if (conn == null) 
    throw new SQLException(“…”);
  
  return conn;

经过以上过程,业务代码能够从数据库连接池中获取可用的数据库连接,并能够使用这些连接与数据库进行交互,当数据库连接被使用完毕时,应该被归还至数据库连接池。正常数据库连接的归还逻辑是调用 Connection.close 方法,但是 PooledDataSource 中返回的是 PooledConnection,调用 Connection 接口的 close 方法实际上会调用 PooledConnection 的 close 方法。由于 PooledConnection 本身是 InvocationHandler 代理对象,因此是所有的方法都会经过它的 invoke 方法,该方法如前所说,会拦截 close 方法,并调用 PooledDataSource.pushConnection 方法将连接归还至连接池。

PooledDataSource.pushConnection 方法的具体执行逻辑如下(已去除打印日志的逻辑):

protected void pushConnection(PooledConnection conn) mybatis专题-----mybatis源码学习

mybatis xml和dao扫描写法

mybatis源码过程学习梳理

mybatis学习笔记(13)-查询缓存之二级缓存

mybatis源码学习mybatis的插件功能

mybatis源码学习mybatis的参数处理