sharding-jdbc-core 源码分析
Posted binarylei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sharding-jdbc-core 源码分析相关的知识,希望对你有一定的参考价值。
Sharding-Jdbc 源码分析
Apache Sharding-Sphere 系列目录(https://www.cnblogs.com/binarylei/p/12217637.html)
在看 Sharding-Jdbc 源码之前,强烈建议先阅读一直官网的文章:
JDBC 调用过程如下:APP -> ORM -> JDBC -> PROXY -> mysql。如果要完成数据的分库分表,可以在这五层任意地方进行,Sharding-Jdbc 是在 JDBC 层进行分库分表,Sharding-Proxy 是在 PROXY 进行分库分表。
Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。具体使用方法参考:Apache Sharding-Jdbc 使用示例
try(DataSource dataSource = ShardingDataSourceFactory.createDataSource(
createDataSourceMap(), shardingRuleConfig, new Properties()) {
connection Connection = dataSource.getConnection();
...
}
1. Sharding-Jdbc 包结构
sharding-jdbc
├── sharding-jdbc-core 重写DataSource/Connection/Statement/ResultSet四大对象
└── sharding-jdbc-orchestration 配置中心
sharding-core
├── sharding-core-api 接口和配置类
├── sharding-core-common 通用分片策略实现...
├── sharding-core-entry SQL解析、路由、改写,核心类BaseShardingEngine
├── sharding-core-route SQL路由,核心类StatementRoutingEngine
├── sharding-core-rewrite SQL改写,核心类ShardingSQLRewriteEngine
├── sharding-core-execute SQL执行,核心类ShardingExecuteEngine
└── sharding-core-merge 结果合并,核心类MergeEngine
shardingsphere-sql-parser
├── shardingsphere-sql-parser-spi SQLParserEntry,用于初始化SQLParser
├── shardingsphere-sql-parser-engine SQL解析,核心类SQLParseEngine
├── shardingsphere-sql-parser-relation
└── shardingsphere-sql-parser-mysql MySQL解析器,核心类MySQLParserEntry和MySQLParser
shardingsphere-underlying 基础接口和api
├── shardingsphere-rewrite SQLRewriteEngine接口
├── shardingsphere-execute QueryResult查询结果
└── shardingsphere-merge MergeEngine接口
shardingsphere-spi SPI加载工具类
sharding-transaction
├── sharding-transaction-core 接口ShardingTransactionManager,SPI加载
├── sharding-transaction-2pc 实现类XAShardingTransactionManager
└── sharding-transaction-base 实现类SeataATShardingTransactionManager
2. JDBC 四大对象
所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。
public static DataSource createDataSource(
final Map<String, DataSource> dataSourceMap,
final ShardingRuleConfiguration shardingRuleConfig,
final Properties props) throws SQLException {
return new ShardingDataSource(dataSourceMap,
new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}
说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。
2.1 DataSource
DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。
ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。
private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext,
TransactionTypeHolder.get());
}
ShardingDataSource 的功能非常简单,就不多说了。
2.2 Connection
ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式:
@Override
public Statement createStatement(final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {
return new ShardingStatement(this, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability)
throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
说明: ShardingConnection 主要是将创建 ShardingStatement 和 ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。当然 ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能,事务不在本文讨论范围中。
protected Connection createConnection(final String dataSourceName,
final DataSource dataSource) throws SQLException {
return isInShardingTransaction()
? shardingTransactionManager.getConnection(dataSourceName)
: dataSource.getConnection();
}
说明: 如果有事务,则需要通过事务管理器获取连接,关于事务不在本文讨论范围中。
2.3 Statement
Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。
说明: Statement 分为 ShardingStatement 和 ShardingPrepareStatement 两种情况。本文以 ShardingStatement 为例分析 Sharding-Jdbc 执行过程。下一节会重点为分析 ShardingStatement 的执行流程。
2.4 ResultSet
说明: ShardingResultSet 只是对 MergedResult 的简单封装。
private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
return mergeResultSet.next();
}
3. Sharding-Jdbc 执行流程分析
总结: ShardingStatement 执行过程如下:
- SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改写,位于 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接将路由的功能委托给 StatementRoutingEngine(或 PreparedQueryShardingEngine),本质是对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封装。
- StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 执行的操作,位于 sharding-jdbc-core 工程中。本质是对 ShardingExecuteEngine 的封装。
- StatementRoutingEngine:SQL 路由引擎,位于 sharding-core-route 工程中。路由引擎包装了 SQL 解析、路由、改写三点。SQL 路由分两步,先进行数据分片路由(ShardingRouter),再进行主从路由(ShardingMasterSlaveRouter)。
- SQLParseEngine:SQL 解析引擎,位于 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 两种。
- ShardingSQLRewriteEngine:SQL 改写引擎,位于 sharding-core-rewrite 工程中。
- ShardingExecuteEngine:执行引擎,位于 sharding-core-execute 工程中。StatementExecutor 对
- MergeEngine:结果合并引擎,位于 sharding-core-merge 工程中。
接下来一下会对 ShardingStatement 深入分析,之后会对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一个引擎进行分析。
4. sharding-jdbc-core 任务执行分析
ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。
4.1 ShardingStatement
4.1.2 初始化
private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
resultSetHoldability, connection);
}
说明: ShardingStatement 内部执行 SQL 委托给了 statementExecutor。关于 ResultSet.CONCUR_READ_ONLY 等参考这里。
4.1.2 执行
(1)executeQuery 执行过程
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {
clearPrevious();
// 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
shard(sql);
// 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit
initStatementExecutor();
// 3. statementExecutor.executeQuery() 执行任务
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getRuntimeContext().getDatabaseType(),
connection.getRuntimeContext().getRule(), sqlRouteResult,
connection.getRuntimeContext().getMetaData().getRelationMetas(),
statementExecutor.executeQuery());
// 4. 结果合并
result = getResultSet(mergeEngine);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}
(2)SQL 路由(包括 SQL 解析、路由、改写)
private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
runtimeContext.getRule(), runtimeContext.getProps(),
runtimeContext.getMetaData(), runtimeContext.getParseEngine());
sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}
说明: SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult。之后会有一节专门分析 SQL 路由过程。
当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。
4.2 StatementExecutor
StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。
4.2.1 类结构
4.2.2 重要属性
AbstractStatementExecutor 类中重要的属性:
// SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的执行计划StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
new LinkedList<>();
// SQLExecuteTemplate用于执行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查询结果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
4.2.3 生成执行计划
// 执行前清理状态
private void clearPrevious() throws SQLException {
statementExecutor.clear();
}
// 执行时初始化
private void initStatementExecutor() throws SQLException {
statementExecutor.init(sqlRouteResult);
replayMethodForStatements();
}
说明: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。下面我们看一下 init 主要做了些什么事。
statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。
public void init(final SQLRouteResult routeResult) throws SQLException {
setSqlStatementContext(routeResult.getSqlStatementContext());
getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
cacheStatements();
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
final Collection<RouteUnit> routeUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
routeUnits, new SQLExecutePrepareCallback() {
// 获取连接
@Override
public List<Connection> getConnections(
final ConnectionMode connectionMode,
final String dataSourceName, final int connectionSize)
throws SQLException {
return StatementExecutor.super.getConnection().getConnections(
connectionMode, dataSourceName, connectionSize);
}
// 生成执行计划RouteUnit -> StatementExecuteUnit
@Override
public StatementExecuteUnit createStatementExecuteUnit(
final Connection connection, final RouteUnit routeUnit,
final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(
routeUnit, connection.createStatement(
getResultSetType(), getResultSetConcurrency(),
getResultSetHoldability()), connectionMode);
}
});
}
说明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。
4.2.4 任务执行
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback =
new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(sql, statement, connectionMode);
}
};
// 执行StatementExecuteUnit
return executeCallback(executeCallback);
}
// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
final SQLExecuteCallback<T> executeCallback) throws SQLException {
// 执行所有的任务 StatementExecuteUnit
List<T> result = sqlExecuteTemplate.executeGroup(
(Collection) executeGroups, executeCallback);
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
return result;
}
说明: SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。
private QueryResult getQueryResult(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {
ResultSet resultSet = statement.executeQuery(sql);
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode
? new StreamQueryResult(resultSet)
: new MemoryQueryResult(resultSet);
}
说明: ConnectionMode 有两种模式:内存限制(MEMORY_STRICTLY)和连接限制(CONNECTION_STRICTLY),本质是一种资源隔离,保护服务器资源不被消耗殆尽。
如果一个连接执行多个 StatementExecuteUnit 则为内存限制(MEMORY_STRICTLY),采用流式处理,即 StreamQueryResult ,反之则为连接限制(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆,见 Apache Sharding-Proxy 核心原理。
每天用心记录一点点。内容也许不重要,但习惯很重要!
以上是关于sharding-jdbc-core 源码分析的主要内容,如果未能解决你的问题,请参考以下文章