(高级)ShardingSphere源码分析如何实现分布式事务

Posted gonghaiyu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(高级)ShardingSphere源码分析如何实现分布式事务相关的知识,希望对你有一定的参考价值。

在分析事务之前,需要先明白事务与链接的关系,一个事务可以包含多个链接(如读写分离的实现),一个链接也可以对应多个事务(如链接池化技术)。
shardingsphere在快速迭代中,包结构和类结构也在不断变化。所以不同版本,可能包结构不同。

ShardingTransactionManagerEngine

shardingsphere与事务相关的工程有以下几个。

从命名上,我们不难看出 shardingsphere-transaction-core 应该包含了分布式事务相关的一些基础核心类,而 shardingsphere-transaction-2pc 和 shardingsphere-transaction-base 分别基于强一致性和最终一致性的两种实现。其中shardingsphere-transaction-xa-atomikos等是基于不同的外部组件时间xa。同理,shardingsphere-transaction-base-seata-at是基于seata的外部组件实现base事务。

通过枚举类TransactionType 也可以知道,Sharding支持以下三种事务。分别代表了本地事务、XA 二阶段提交事务和 BASE 柔性事务

public enum TransactionType 
    
    LOCAL, XA, BASE;
    
    /**
     * 判断是否是分布式事务
     * 
     * @param transactionType transaction type
     * @return is distributed transaction or not
     */
    public static boolean isDistributedTransaction(final TransactionType transactionType) 
        return XA == transactionType || BASE == transactionType;
    



我们在 ShardingRuntimeContext 这个上下文对象中第一次看到这个分布式事务管理器的入口,如下所示:

public final class ShardingRuntimeContext extends AbstractRuntimeContext<ShardingRule> private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;

	public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule rule, final Properties props, final DatabaseType databaseType) throws SQLException //创建分布式事务管理器引擎并初始化
        shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();
        shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
	

在 ShardingTransactionManagerEngine 的构造函数中,调用了如下所示的 loadShardingTransactionManager 方法:

/**
 * Sharding transaction manager engine.
 */
@Slf4j
public final class ShardingTransactionManagerEngine 
    
    private final Map<TransactionType, ShardingTransactionManager> transactionManagerMap = new EnumMap<>(TransactionType.class);
    
    public ShardingTransactionManagerEngine() 
        loadShardingTransactionManager();
    
    
    private void loadShardingTransactionManager() 
        for (ShardingTransactionManager each : ServiceLoader.load(ShardingTransactionManager.class)) 
            if (transactionManagerMap.containsKey(each.getTransactionType())) 
                log.warn("Find more than one  transaction manager implementation class, use `` now",
                    each.getTransactionType(), transactionManagerMap.get(each.getTransactionType()).getClass().getName());
                continue;
            
            transactionManagerMap.put(each.getTransactionType(), each);
        
    

这里直接使用了 JDK 中 ServiceLoader 工具类的 load 方法来加载 ShardingTransactionManager 的实现类,这是使用 SPI 实现微内核架构的最直接的方式,上述方法的作用就是加载类路径上的 ShardingTransactionManager 并缓存在内存中。在 ShardingSphere 中,ShardingTransactionManager 是对分布式事务管理器的一种抽象。我们在后续内容中会具体展开。

当使用时,一般构建完 ShardingTransactionManagerEngine 对象之后,会调用它的 init 方法进行初始化,如下所示:

    /**
     * Initialize sharding transaction managers.
     *
     * @param databaseType database type
     * @param dataSourceMap data source map
     * @param xaTransactionMangerType XA transaction manger type
     */
    public void init(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap, final String xaTransactionMangerType) 
        for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) 
            entry.getValue().init(databaseType, getResourceDataSources(dataSourceMap), xaTransactionMangerType);
        
    
    
    private Collection<ResourceDataSource> getResourceDataSources(final Map<String, DataSource> dataSourceMap) 
        return dataSourceMap.entrySet().stream().map(entry -> new ResourceDataSource(entry.getKey(), entry.getValue())).collect(Collectors.toCollection(LinkedList::new));
    
    
   

这部分的代码相当于是执行所获取 ShardingTransactionManager 的 init 方法对其进行初始化。在初始化过程中,我们在这里还看到了一个新的数据源对象 ResourceDataSource,如下所示:

/**
 * Unique resource data source.
 */
@Getter
public final class ResourceDataSource 
    
    private final String originalName;
    
    private final String uniqueResourceName;
    
    private final DataSource dataSource;
    
    public ResourceDataSource(final String originalName, final DataSource dataSource) 
        this.originalName = originalName;
        this.dataSource = dataSource;
        uniqueResourceName = ResourceIDGenerator.getInstance().nextId() + originalName;
    


ResourceDataSource 的作用就是保存数据库名和 DataSource 的信息,并为这个 ResourceDataSource 构建一个唯一的资源名称,构建过程使用了 ResourceIDGenerator 工具类。

这里有个单例的实现方式,可以学一下。这个单例维持了一个进程内的全局ID生成器。

public final class ResourceIDGenerator 
    
    private static final ResourceIDGenerator INSTANCE = new ResourceIDGenerator();
    
    private final AtomicInteger count = new AtomicInteger();
    
    /**
     * Get instance.
     *
     * @return instance
     */
    public static ResourceIDGenerator getInstance() 
        return INSTANCE;
    
    
    /**
     * Next unique resource id.
     *
     * @return next ID
     */
    String nextId() 
        return String.format("resource-%d-", count.incrementAndGet());
    


再回到ShardingTransactionManagerEngine类,看下怎么获取事务管理器,从下面的代码中可以看到,这个与spring容器的管理类似,同样也是通过Map进行处理。下面还有个close方法,通过这个方法,可以关闭当前进程的分布式事务管理器。AutoCloseable 接口在try结束的时候,会自动将这些资源关闭(调用close方法)。

 /**
     * Get sharding transaction manager.
     *
     * @param transactionType transaction type
     * @return sharding transaction manager
     */
    public ShardingTransactionManager getTransactionManager(final TransactionType transactionType) 
        ShardingTransactionManager result = transactionManagerMap.get(transactionType);
        if (TransactionType.LOCAL != transactionType) 
            Preconditions.checkNotNull(result, "Cannot find transaction manager of [%s]", transactionType);
        
        return result;
    
    
    /**
     * Close sharding transaction managers.
     * 
     * @throws Exception exception
     */
    public void close() throws Exception 
        for (Entry<TransactionType, ShardingTransactionManager> entry : transactionManagerMap.entrySet()) 
            entry.getValue().close();
        
    

因不同的分布式事务实现方式不一样,所以我们来看看事务管理器接口ShardingTransactionManager。事务管理器与

public interface ShardingTransactionManager extends AutoCloseable 

    //根据数据库类型和 ResourceDataSource 进行初始化
    void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources);

    //获取 TransactionType
    TransactionType getTransactionType();

    //判断是否在事务中
    boolean isInTransaction();

    //获取支持事务的 Connection
    Connection getConnection(String dataSourceName) throws SQLException;

    //开始事务
    void begin();

    //提交事务
    void commit();

    //回滚事务
    void rollback();


对比后面要介绍的 JTA 中的 TransactionManager 接口,我们同样找到了作为一个事务管理器而言所必需的 begin、commit 和 rollback 这三个基本操作。ShardingSphere 还为这些基本操作专门提供了一个枚举 TransactionOperationType。

我们通过查看 ShardingSphere 中 ShardingTransactionManager 的类层结构,发现存在两个真正的实现类(其他都没有具体实现),即 XAShardingTransactionManager 和 SeataATShardingTransactionManager 类。

XAShardingTransactionManager实现

XA 是由 X/Open 组织提出的两阶段提交协议,是一种分布式事务的规范,XA 规范主要定义了面向全局的事务管理器 TransactionManager(TM)和面向局部的资源管理器 ResourceManager(RM)之间的接口。
XA 接口是双向的系统接口,在 TransactionManager,以及一个或多个 ResourceManager 之间形成通信桥梁。通过这样的设计,TransactionManager 控制着全局事务,管理事务生命周期,并协调资源,而 ResourceManager 负责控制和管理包括数据库相关的各种实际资源。

XA 的整体结构以及 TransactionManager 和 ResourceManager 之间的交互过程参考下图:


所有关于分布式事务的介绍中都必然会讲到两阶段提交,因为它是实现 XA 分布式事务的关键。我们知道在两阶段提交过程中,存在协调者和参与者两种角色。在上图中,XA 引入的 TransactionManager 充当着全局事务中的“协调者”角色,而图中的 ResourceManager 相当于“参与者”角色,对自身内部的资源进行统一管理。

和很多其他的 Java 规范一样,JTA(Java Transaction API) 仅仅定义了接口,具体的实现则是由供应商负责提供。目前JTA 的实现分成两大类,其中一类是直接集成在应用服务器中,例如 JBoss;另一类则是独立的实现,例如 ShardingSphere 中所采用的 Atomikos 和 Bitronix,这些实现可以应用在那些不使用 J2EE 应用服务器的环境里(例如普通的 Java 应用)用以提供分布式事务保证。另一方面,JTA 接口里的 ResourceManager 同样需要数据库厂商提供 XA 的驱动实现。

在 JTA 中,提供了以下几个核心接口:

  1. UserTransaction,该接口是面向开发人员的接口,能够编程控制事务处理。
  2. TransactionManager,通过该接口允许应用程序服务器来控制分布式事务。
  3. Transaction,代表正在管理应用程序的事务。
  4. XAResource,这是一个面向提供商的实现接口,是一个基于 XA 协议的 Java 映射,各个数据库提供商在提供访问自己资源的驱动时,必须实现这样的接口。

在 javax.sql 包中还存在几个与 XA 相关的核心类,即代表连接的 XAConnection、代表数据源的 XADataSource,以及代表事务编号的 Xid。

接下来我们看下源码测试类关于实现XA的伪代码,基于XA的分布式事务的实现。

// XAShardingTransactionManagerTest类
@Test
    public void assertGetConnection() throws SQLException 
        xaShardingTransactionManager.begin();
        Connection actual1 = xaShardingTransactionManager.getConnection("ds1");
        Connection actual2 = xaShardingTransactionManager.getConnection("ds2");
        Connection actual3 = xaShardingTransactionManager.getConnection("ds3");
        assertThat(actual1, instanceOf(Connection.class));
        assertThat(actual2, instanceOf(Connection.class));
        assertThat(actual3, instanceOf(Connection.class));
        xaShardingTransactionManager.commit();
    

要想上述代码发挥作用,这里的连接对象 Connection 就得支持 XAResource 接口,也就涉及一系列关于 XADataSource 和 XAConnection 的处理过程。

在ShardingSphere中实现XA,主要看 XAShardingTransactionManager 类,它主要负责对实际的 DataSource 进行管理和适配,并且将接入端事务的 begin/commit/rollback 操作委托给具体的 XA 事务管理器,例如 XAShardingTransactionManager 就会使用 XATransactionManager 中的 TransactionManager 完成 commit 操作:

@Override
public void commit() 
    xaTransactionManager.getTransactionManager().commit();


这里的 XATransactionManager 就是对各种第三方 XA 事务管理器的一种抽象,封装了对Atomikos、Bitronix 等第三方工具的实现方式。

SeataATShardingTransactionManager

Seata 框架中一个分布式事务包含三种角色,除了 XA 中同样具备的 TransactionManager(TM)和 ResourceManager(RM) 之外,还存在一个事务协调器 TransactionCoordinator (TC),维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。

其中,TM 是一个分布式事务的发起者和终结者,TC 负责维护分布式事务的运行状态,而 RM 则负责本地事务的运行。

基于Seata 框架,一个分布式事务的执行流程包含如下五个步骤:


先来了解下XATransactionManager 实例的加载,发现XATransactionManagerLoader 仍然是采用了 JDK 中的 ServiceLoader 类,如下所示:


/**
 * XA transaction manager loader.
 */
@Slf4j
public final class XATransactionManagerLoader 
    
    private static final XATransactionManagerLoader INSTANCE = new XATransactionManagerLoader();
    
    private XATransactionManagerLoader() 
    
    
    
    /**
     * Get instance of XA transaction manager SPI loader.
     *
     * @return instance of XA transaction manager SPI loader
     */
    public static XATransactionManagerLoader getInstance() 
        return INSTANCE;
    
    
    /**
     * Get xa transaction manager.
     *
     * @param type type
     * @return xa transaction manager
     */
    public XATransactionManager getXATransactionManager(final String type) 
        Iterator<XATransactionManager> xaTransactionManagers = ServiceLoader.load(XATransactionManager.class).iterator();
        if (!xaTransactionManagers.hasNext()) 
            return new AtomikosTransactionManager();
        
        while (xaTransactionManagers.hasNext()) 
            XATransactionManager result = xaTransactionManagers.next();
            if (result.getType().equalsIgnoreCase(type)) 
                return result;
            
        
        return new AtomikosTransactionManager();
    

XATransactionManager 就是对各种第三方 XA 事务管理器的一种抽象,通过上述代码,可以看到在找不到合适的 XATransactionManager 的情况下,系统默认会创建一个 AtomikosTransactionManager。而这个 XATransactionManager 的定义实际上是位于单独的一个代码工程中,即 sharding-transaction-xa-spi 工程。我们重点关注下它的一个实现。

public final class SingleXAResource implements XAResource 

    private final String resourceName;

    private final XAResource delegate;

    @Override
    public void start(final Xid xid, final int i) throws XAException 
        delegate.start(xid, i);
    
 
    @Override
    public void commit(final Xid xid, final boolean b) throws XAException 
        delegate.commit(xid, b);
    

	@Override
    public void rollback(final Xid xid) throws XAException 
        delegate.rollback(xid);
    
 
    @Override
    public boolean isSameRM(final XAResource xaResource) 
        SingleXAResource singleXAResource = (SingleXAResource) xaResource;
        return resourceName.equals(singleXAResource.getResourceName());
	

可以看到 SingleXAResource 虽然实现了 JTA 的 XAResource 接口,但更像是一个代理类,具体的操作方法还是委托给了内部的 XAResource 进行实现。

XADataSource

XADataSource接口属于JDBC规范的范畴,通过接口内定义的方法可以看出,该接口的作用就是获取 XAConnection。但先得说明怎么得到XADataSource,这个其实与JDBC中获取DataSource一样,通过DataSourceFactory来进行创建。

public final class XADataSourceFactory 
    
    /**
     * Create XA data source through general data source.
     *
     * @param databaseType database type
     * @param dataSource data source
     * @return XA data source
     */
    public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) 
        return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);
    

这里首先用到了一个 XADataSourceDefinition 接口,从源码可以看到,这个接口主要是获取不同驱动类的名称和全路径类名。

public interface XADataSourceDefinition extends DatabaseTypeAwareSPI 
    
    /**
     * Get XA driver class names.
     * 
     * @return XA driver class names
     */
    Collection<String> getXADriverClassName();

这个类集成了DatabaseTypeAwareSPI,在 ShardingSphere 中,继承 DatabaseTypeAwareSPI 接口的就只有 XADataSourceDefinition 接口,而后者存在一批实现类。我们以mysqlXADataSourceDefinition为例,该类分别实现了 DatabaseTypeAwareSPI 和 XADataSourceDefinition 这两个接口中所定义的三个方法:

public final class MySQLXADataSourceDefinition implements XADataSourceDefinition 
    
    @Override
    public String getDatabaseType() 
        return "MySQL";
    
    
    @Override
    public Collection<String> getXADriverClassName() 
        return Arrays.asList("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "com.mysql.cj.jdbc.MysqlXADataSource");
    

我们从这里得知,作为数据库供应商,MySQL 提供了两个 XADataSource 的驱动程序。而在 getXAProperties 中,我们发现 URL、Username 和 Password 等信息是通过 DatabaseAccessConfiguration 对象进行获取的,该对象在本文后面会介绍到。

另一方面,我们用于获取 XADataSourceDefinition 的工厂类 XADataSourceDefinitionFactory,也是基于 SPI 机制进行加载的。


/**
 * XA data source definition factory.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class XADataSourceDefinitionFactory 
    
    private static final Map<DatabaseType, XADataSourceDefinition> XA_DATA_SOURCE_DEFINITIONS = new HashMap<>();
    
    static 
        for (XADataSourceDefinition each : ServiceLoader.load(XADataSourceDefinition.class)) 
            XA_DATA_SOURCE_DEFINITIONS.put(DatabaseTypeRegistry.getActualDatabaseType(each.getDatabaseType()), each);
        
    
    
    /**
     * Get XA data source definition.
     * 
     * @param databaseType database type
     * @return XA data source definition
     */
    public static XADataSourceDefinition getXADataSourceDefinition(final DatabaseType databaseType) 
        return XA_DATA_SOURCE_DEFINITIONS.get(databaseType);
    


同样,在 sharding-transaction-core 工程中,我们也发现了如下所示的 SPI 配置信息,并采用了默认的事务处理器。

当根据数据库类型获取了对应的 XADataSourceDefinition 之后,我们就可以根据 XADriverClassName 来创建具体的 XADataSource:

//DataSourceSwapper类
    private XADataSource loadXADataSource(final String xaDataSourceClassName) 
        Class<?> xaDataSourceClass;
        try 
            xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
         catch (final ClassNotFoundException ignored) 
            try 
                xaDataSourceClass = Class.forName(xaDataSourceClassName);
             catch (final ClassNotFoundException ex) (高级)ShardingSphere源码解析-SQL路由

shardingsphere源码分析(六)-路由引擎

漏洞寻踪Apache ShardingSphere RCE漏洞分析

工程设计-shardingsphere源码解读

全链路压测之影子库及ShardingSphere实现影子库源码剖析

shardingsphere分析