(高级)ShardingSphere源码分析如何实现分布式事务
Posted gonghaiyu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(高级)ShardingSphere源码分析如何实现分布式事务相关的知识,希望对你有一定的参考价值。
在分析事务之前,需要先明白事务与链接的关系,一个事务可以包含多个链接(如读写分离的实现),一个链接也可以对应多个事务(如链接池化技术)。
shardingsphere在快速迭代中,包结构和类结构也在不断变化。所以不同版本,可能包结构不同。
ShardingTransactionManagerEngine
shardingsphere与事务相关的工程有以下几个。
从命名上,我们不难看出 shardingsphere-transaction-core 应该包含了分布式事务相关的一些基础核心类,而 shardingsphere-transaction-2pc 和 shardingsphere-transaction-base 分别基于强一致性和最终一致性的两种实现。其中shardingsphere-transaction-xa-atomikos等是基于不同的外部组件时间xa。同理,shardingsphere-transaction-base-seata-at是基于seata的外部组件实现base事务。
通过枚举类TransactionType 也可以知道,Sharding支持以下三种事务。分别代表了本地事务、XA 二阶段提交事务和 BASE 柔性事务
public enum TransactionType {
LOCAL, XA, BASE;
/**
* 判断是否是分布式事务
*
* @param transactionType transaction type
* @return is distributed transaction or not
*/
public static boolean isDistributedTransaction(final TransactionType transactionType) {
return XA == transactionType || BASE == transactionType;
}
}
我们在 ShardingRuntimeContext 这个上下文对象中第一次看到这个分布式事务管理器的入口,如下所示:
public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> {
…
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException {
…
//创建分布式事务管理器引擎并初始化
shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
}
…
}
在 ShardingTransactionManagerEngine 的构造函数中,调用了如下所示的 loadShardingTransactionManager 方法:
/**
* Sharding transaction manager engine.
*/
@Slf4j
public final class ShardingTransactionManagerEngine {
private final Map<TransactionType, ShardingTransactionManager> transactionManagerMap = new EnumMap<>(TransactionType.class);
public ShardingTransactionManagerEngine() {
loadShardingTransactionManager();
}
private void loadShardingTransactionManager() {
for (ShardingTransactionManager each : ServiceLoader.load(ShardingTransactionManager.class)) {
if (transactionManagerMap.containsKey(each.getTransactionType())) {
log.warn("Find more than one {} transaction manager implementation class, use `{}` now",
each.getTransactionType(), transactionManagerMap.get(each.getTransactionType()).getClass().getName());
continue;
}
transactionManagerMap.put(each.getTransactionType(), each);
}
}
这里直接使用了 JDK 中 ServiceLoader 工具类的 load 方法来加载 ShardingTransactionManager 的实现类,这是使用 SPI 实现微内核架构的最直接的方式,上述方法的作用就是加载类路径上的 ShardingTransactionManager 并缓存在内存中。在 ShardingSphere 中,ShardingTransactionManager 是对分布式事务管理器的一种抽象。我们在后续内容中会具体展开。
当使用时,一般构建完 ShardingTransactionManagerEngine 对象之后,会调用它的 init 方法进行初始化,如下所示:
/**
* Initialize sharding transaction managers.
*
* @param databaseType database type
* @param dataSourceMap data source map
* @param xaTransactionMangerType XA transaction manger type
*/
public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap, final String xaTransactionMangerType) {
for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) {
entry.getValue().init(databaseType, getResourceDataSources(dataSourceMap), xaTransactionMangerType);
}
}
private Collection<ResourceDataSource> getResourceDataSources(final Map<String, DataSource> dataSourceMap) {
return dataSourceMap.entrySet().stream().map(entry -> new ResourceDataSource(entry.getKey(), entry.getValue())).collect(Collectors.toCollection(LinkedList::new));
}
这部分的代码相当于是执行所获取 ShardingTransactionManager 的 init 方法对其进行初始化。在初始化过程中,我们在这里还看到了一个新的数据源对象 ResourceDataSource,如下所示:
/**
* Unique resource data source.
*/
@Getter
public final class ResourceDataSource {
private final String originalName;
private final String uniqueResourceName;
private final DataSource dataSource;
public ResourceDataSource(final String originalName, final DataSource dataSource) {
this.originalName = originalName;
this.dataSource = dataSource;
uniqueResourceName = ResourceIDGenerator.getInstance().nextId() + originalName;
}
}
ResourceDataSource 的作用就是保存数据库名和 DataSource 的信息,并为这个 ResourceDataSource 构建一个唯一的资源名称,构建过程使用了 ResourceIDGenerator 工具类。
这里有个单例的实现方式,可以学一下。这个单例维持了一个进程内的全局ID生成器。
public final class ResourceIDGenerator {
private static final ResourceIDGenerator INSTANCE = new ResourceIDGenerator();
private final AtomicInteger count = new AtomicInteger();
/**
* Get instance.
*
* @return instance
*/
public static ResourceIDGenerator getInstance() {
return INSTANCE;
}
/**
* Next unique resource id.
*
* @return next ID
*/
String nextId() {
return String.format("resource-%d-", count.incrementAndGet());
}
}
再回到ShardingTransactionManagerEngine类,看下怎么获取事务管理器,从下面的代码中可以看到,这个与spring容器的管理类似,同样也是通过Map进行处理。下面还有个close方法,通过这个方法,可以关闭当前进程的分布式事务管理器。AutoCloseable 接口在try结束的时候,会自动将这些资源关闭(调用close方法)。
/**
* Get sharding transaction manager.
*
* @param transactionType transaction type
* @return sharding transaction manager
*/
public ShardingTransactionManager getTransactionManager(final TransactionType transactionType) {
ShardingTransactionManager result = transactionManagerMap.get(transactionType);
if (TransactionType.LOCAL != transactionType) {
Preconditions.checkNotNull(result, "Cannot find transaction manager of [%s]", transactionType);
}
return result;
}
/**
* Close sharding transaction managers.
*
* @throws Exception exception
*/
public void close() throws Exception {
for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) {
entry.getValue().close();
}
}
}
因不同的分布式事务实现方式不一样,所以我们来看看事务管理器接口ShardingTransactionManager。事务管理器与
public interface ShardingTransactionManager extends AutoCloseable {
//根据数据库类型和 ResourceDataSource 进行初始化
void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);
//获取 TransactionType
TransactionType getTransactionType();
//判断是否在事务中
boolean isInTransaction();
//获取支持事务的 Connection
Connection getConnection(String dataSourceName) throws SQLException;
//开始事务
void begin();
//提交事务
void commit();
//回滚事务
void rollback();
}
对比后面要介绍的 JTA 中的 TransactionManager 接口,我们同样找到了作为一个事务管理器而言所必需的 begin、commit 和 rollback 这三个基本操作。ShardingSphere 还为这些基本操作专门提供了一个枚举 TransactionOperationType。
我们通过查看 ShardingSphere 中 ShardingTransactionManager 的类层结构,发现存在两个真正的实现类(其他都没有具体实现),即 XAShardingTransactionManager 和 SeataATShardingTransactionManager 类。
XAShardingTransactionManager实现
XA 是由 X/Open 组织提出的两阶段提交协议,是一种分布式事务的规范,XA 规范主要定义了面向全局的事务管理器 TransactionManager(TM)和面向局部的资源管理器 ResourceManager(RM)之间的接口。
XA 接口是双向的系统接口,在 TransactionManager,以及一个或多个 ResourceManager 之间形成通信桥梁。通过这样的设计,TransactionManager 控制着全局事务,管理事务生命周期,并协调资源,而 ResourceManager 负责控制和管理包括数据库相关的各种实际资源。
XA 的整体结构以及 TransactionManager 和 ResourceManager 之间的交互过程参考下图:
所有关于分布式事务的介绍中都必然会讲到两阶段提交,因为它是实现 XA 分布式事务的关键。我们知道在两阶段提交过程中,存在协调者和参与者两种角色。在上图中,XA 引入的 TransactionManager 充当着全局事务中的“协调者”角色,而图中的 ResourceManager 相当于“参与者”角色,对自身内部的资源进行统一管理。
和很多其他的 Java 规范一样,JTA(Java Transaction API) 仅仅定义了接口,具体的实现则是由供应商负责提供。目前JTA 的实现分成两大类,其中一类是直接集成在应用服务器中,例如 JBoss;另一类则是独立的实现,例如 ShardingSphere 中所采用的 Atomikos 和 Bitronix,这些实现可以应用在那些不使用 J2EE 应用服务器的环境里(例如普通的 Java 应用)用以提供分布式事务保证。另一方面,JTA 接口里的 ResourceManager 同样需要数据库厂商提供 XA 的驱动实现。
在 JTA 中,提供了以下几个核心接口:
- UserTransaction,该接口是面向开发人员的接口,能够编程控制事务处理。
- TransactionManager,通过该接口允许应用程序服务器来控制分布式事务。
- Transaction,代表正在管理应用程序的事务。
- XAResource,这是一个面向提供商的实现接口,是一个基于 XA 协议的 Java 映射,各个数据库提供商在提供访问自己资源的驱动时,必须实现这样的接口。
在 javax.sql 包中还存在几个与 XA 相关的核心类,即代表连接的 XAConnection、代表数据源的 XADataSource,以及代表事务编号的 Xid。
接下来我们看下源码测试类关于实现XA的伪代码,基于XA的分布式事务的实现。
// XAShardingTransactionManagerTest类
@Test
public void assertGetConnection() throws SQLException {
xaShardingTransactionManager.begin();
Connection actual1 = xaShardingTransactionManager.getConnection("ds1");
Connection actual2 = xaShardingTransactionManager.getConnection("ds2");
Connection actual3 = xaShardingTransactionManager.getConnection("ds3");
assertThat(actual1, instanceOf(Connection.class));
assertThat(actual2, instanceOf(Connection.class));
assertThat(actual3, instanceOf(Connection.class));
xaShardingTransactionManager.commit();
}
要想上述代码发挥作用,这里的连接对象 Connection 就得支持 XAResource 接口,也就涉及一系列关于 XADataSource 和 XAConnection 的处理过程。
在ShardingSphere中实现XA,主要看 XAShardingTransactionManager 类,它主要负责对实际的 DataSource 进行管理和适配,并且将接入端事务的 begin/commit/rollback 操作委托给具体的 XA 事务管理器,例如 XAShardingTransactionManager 就会使用 XATransactionManager 中的 TransactionManager 完成 commit 操作:
@Override
public void commit() {
xaTransactionManager.getTransactionManager().commit();
}
这里的 XATransactionManager 就是对各种第三方 XA 事务管理器的一种抽象,封装了对Atomikos、Bitronix 等第三方工具的实现方式。
SeataATShardingTransactionManager
Seata 框架中一个分布式事务包含三种角色,除了 XA 中同样具备的 TransactionManager(TM)和 ResourceManager(RM) 之外,还存在一个事务协调器 TransactionCoordinator (TC),维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
其中,TM 是一个分布式事务的发起者和终结者,TC 负责维护分布式事务的运行状态,而 RM 则负责本地事务的运行。
基于Seata 框架,一个分布式事务的执行流程包含如下五个步骤:
先来了解下XATransactionManager 实例的加载,发现XATransactionManagerLoader 仍然是采用了 JDK 中的 ServiceLoader 类,如下所示:
/**
* XA transaction manager loader.
*/
@Slf4j
public final class XATransactionManagerLoader {
private static final XATransactionManagerLoader INSTANCE = new XATransactionManagerLoader();
private XATransactionManagerLoader() {
}
/**
* Get instance of XA transaction manager SPI loader.
*
* @return instance of XA transaction manager SPI loader
*/
public static XATransactionManagerLoader getInstance() {
return INSTANCE;
}
/**
* Get xa transaction manager.
*
* @param type type
* @return xa transaction manager
*/
public XATransactionManager getXATransactionManager(final String type) {
Iterator<XATransactionManager> xaTransactionManagers = ServiceLoader.load(XATransactionManager.class).iterator();
if (!xaTransactionManagers.hasNext()) {
return new AtomikosTransactionManager();
}
while (xaTransactionManagers.hasNext()) {
XATransactionManager result = xaTransactionManagers.next();
if (result.getType().equalsIgnoreCase(type)) {
return result;
}
}
return new AtomikosTransactionManager();
}
}
XATransactionManager 就是对各种第三方 XA 事务管理器的一种抽象,通过上述代码,可以看到在找不到合适的 XATransactionManager 的情况下,系统默认会创建一个 AtomikosTransactionManager。而这个 XATransactionManager 的定义实际上是位于单独的一个代码工程中,即 sharding-transaction-xa-spi 工程。我们重点关注下它的一个实现。
public final class SingleXAResource implements XAResource {
private final String resourceName;
private final XAResource delegate;
@Override
public void start(final Xid xid, final int i) throws XAException {
delegate.start(xid, i);
}
@Override
public void commit(final Xid xid, final boolean b) throws XAException {
delegate.commit(xid, b);
}
@Override
public void rollback(final Xid xid) throws XAException {
delegate.rollback(xid);
}
@Override
public boolean isSameRM(final XAResource xaResource) {
SingleXAResource singleXAResource = (SingleXAResource) xaResource;
return resourceName.equals(singleXAResource.getResourceName());
}
…
}
可以看到 SingleXAResource 虽然实现了 JTA 的 XAResource 接口,但更像是一个代理类,具体的操作方法还是委托给了内部的 XAResource 进行实现。
XADataSource
XADataSource接口属于JDBC规范的范畴,通过接口内定义的方法可以看出,该接口的作用就是获取 XAConnection。但先得说明怎么得到XADataSource,这个其实与JDBC中获取DataSource一样,通过DataSourceFactory来进行创建。
public final class XADataSourceFactory {
/**
* Create XA data source through general data source.
*
* @param databaseType database type
* @param dataSource data source
* @return XA data source
*/
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);
}
}
这里首先用到了一个 XADataSourceDefinition 接口,从源码可以看到,这个接口主要是获取不同驱动类的名称和全路径类名。
public interface XADataSourceDefinition extends DatabaseTypeAwareSPI {
/**
* Get XA driver class names.
*
* @return XA driver class names
*/
Collection<String> getXADriverClassName();
}
这个类集成了DatabaseTypeAwareSPI,在 ShardingSphere 中,继承 DatabaseTypeAwareSPI 接口的就只有 XADataSourceDefinition 接口,而后者存在一批实现类。我们以mysqlXADataSourceDefinition为例,该类分别实现了 DatabaseTypeAwareSPI 和 XADataSourceDefinition 这两个接口中所定义的三个方法:
public final class MySQLXADataSourceDefinition implements XADataSourceDefinition {
@Override
public String getDatabaseType() {
return "MySQL";
}
@Override
public Collection<String> getXADriverClassName() {
return Arrays.asList("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "com.mysql.cj.jdbc.MysqlXADataSource");
}
}
我们从这里得知,作为数据库供应商,MySQL 提供了两个 XADataSource 的驱动程序。而在 getXAProperties 中,我们发现 URL、Username 和 Password 等信息是通过 DatabaseAccessConfiguration 对象进行获取的,该对象在本文后面会介绍到。
另一方面,我们用于获取 XADataSourceDefinition 的工厂类 XADataSourceDefinitionFactory,也是基于 SPI 机制进行加载的。
/**
* XA data source definition factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class XADataSourceDefinitionFactory {
private static final Map<DatabaseType, XADataSourceDefinition> XA_DATA_SOURCE_DEFINITIONS = new HashMap<>();
static {
for (XADataSourceDefinition each : ServiceLoader.load(XADataSourceDefinition.class)) {
XA_DATA_SOURCE_DEFINITIONS.put(DatabaseTypeRegistry.getActualDatabaseType(each.getDatabaseType()), each);
}
}
/**
* Get XA data source definition.
*
* @param databaseType database type
* @return XA data source definition
*/
public static XADataSourceDefinition getXADataSourceDefinition(final DatabaseType databaseType) {
return XA_DATA_SOURCE_DEFINITIONS.get(databaseType);
}
}
同样,在 sharding-transaction-core 工程中,我们也发现了如下所示的 SPI 配置信息,并采用了默认的事务处理器。
当根据数据库类型获取了对应的 XADataSourceDefinition 之后,我们就可以根据 XADriverClassName 来创建具体的 XADataSource:
//DataSourceSwapper类
private XADataSource loadXADataSource(final String xaDataSourceClassName) {
Class<?> xaDataSourceClass;
try {
xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
} catch (final ClassNotFoundException ignored) {
try {
xaDataSourceClass = Class.forName(xaDataSourceClassName);
} catch (高级)ShardingSphere源码解析-SQL路由
漏洞寻踪Apache ShardingSphere RCE漏洞分析