多数据源下Seata分布式事务出现的问题和解决方法

Posted JF Coder

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多数据源下Seata分布式事务出现的问题和解决方法相关的知识,希望对你有一定的参考价值。

什么是Seata

推荐使用AT模式
Seata官方文档

整体机制

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交

    (本地数据库先保存,并向undo_log表写入日志),释放本地锁和连接资源。

  • 二阶段:

    • 提交异步化,非常快速地完成。
    • 回滚通过一阶段的回滚日志进行反向补偿(回滚时-读取undo_log表回滚回初始状态)

多数据源遇到的问题

在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:

单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即为 undo_log 生成了 undo_log, 假设为undo_log2,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2生成的有问题,在undo_log对应的事务分支回滚时会将业务表关联的undo_log也一起删除,从而导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。
多数据源和逻辑数据源被代理情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的selectfor update 和 delete 操作,被分散在多个事务中执行,导致一个事务在执行完select for update后一直不提交,一个事务在执行delete时一直等待锁,直到超时

解决方案

手动代理数据源

@Bean
public DataSource druidDataSource() {
    return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
    return new DataSourceProxy(druidDataSource);
}

Seata内部的自动代理实现

针对DataSource创建一个代理类,在代理类里面基于DataSource获取DataSourceProxy(如果没有就创建),然后调用DataSourceProxy的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator类

类图

ProxyConfig (org.springframework.aop.framework)
    ProxyProcessorSupport (org.springframework.aop.framework)
        AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
            SeataAutoDataSourceProxyCreator (io.seata.spring.annotation.datasource)

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
    private final String[] excludes;
    private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

    public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
        this.excludes = excludes;
        setProxyTargetClass(!useJdkProxy);
    }

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Auto proxy of [{}]", beanName);
        }
        return new Object[]{advisor};
    }

    @Override
    protected boolean shouldSkip(Class<?> beanClass, String beanName) {
        return SeataProxy.class.isAssignableFrom(beanClass) ||
                DataSourceProxy.class.isAssignableFrom(beanClass) ||
                !DataSource.class.isAssignableFrom(beanClass) ||
                Arrays.asList(excludes).contains(beanClass.getName());
    }
}

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
        Method method = invocation.getMethod();
        Object[] args = invocation.getArguments();
        Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
        if (m != null) {
            return m.invoke(dataSourceProxy, args);
        } else {
            return invocation.proceed();
        }
    }

    @Override
    public Class<?>[] getInterfaces() {
        return new Class[]{SeataProxy.class};
    }
}

数据源多层代理(案例)

    @Bean(name = "writeDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.write")
    public DataSource writeDataSource() {
        return DruidDataSourceBuilder.create().build();
    }


    @Bean(name = "readDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.read")
    public DataSource readDataSource() {
        return DruidDataSourceBuilder.create().build();
    }

    @Bean(name = "writeDataSourceProxy")
    public DataSourceProxy writeDataSourceProxy(@Qualifier("writeDataSource") DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean("readDataSourceProxy")
    public DataSourceProxy readDataSourceProxy(@Qualifier("readDataSource") DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

  1. 首先我们在配置类里面注入了两个DataSource,分别为: DruidDataSource和DataSourceProxy,
    其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource属性,并且DataSourceProxy为使用了@Primary注解声明
  2. 应用默认开启了数据源自动代理,所以在调用DruidDataSource相关方法时,又会为DruidDataSource创建一个对应的数据源代理DataSourceProxy2
    当我们在程序中想获取一个Connection时会发生什么?
  • 先获取一个DataSource,因为DataSourceProxy为Primary,所以此时拿到的是DataSourceProxy
  • 基于DataSource获取一个Connection,即通过DataSourceProxy获取Connection。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法,但因为切面会对DruidDataSource进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2,然后调用DataSourceProxy2#getConnection,然后再调用DruidDataSource的getConnection方法。最终形成了双层代理, 返回的Connection也是一个双层的ConnectionProxy;

Seata开启动态数据源后,根据上面的代码实例,最终将生成三个代理数据源
1.读写代理数据源两个,主代理数据源
2.根据@Primary指定默认的数据源,默认返回masterSlaveProxy

Seata关于动态数据源的基本类实现

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    /**实例化新的数据源代理,参数: targetDataSource–目标数据源 */
    public DataSourceProxy(DataSource targetDataSource) {
        this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
    }
    /**实例化新的数据源代理
参数: targetDataSource–目标数据源 resourceGroupId–资源组id*/
    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        this.targetDataSource = targetDataSource;
        init(targetDataSource, resourceGroupId);
    }
}

DataSourceProxy
初始化数据连接,获得数据连接对象

    private void init(DataSource dataSource, String resourceGroupId) {
        this.resourceGroupId = resourceGroupId;
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        DefaultResourceManager.get().registerResource(this);
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        RootContext.setDefaultBranchType(this.getBranchType());
    }

    /**
获取普通连接。 返回值: 普通连接 抛出: SQLException–sql异常
     */
    public Connection getPlainConnection() throws SQLException {
        return targetDataSource.getConnection();
    }

Seata代理数据源类关系

BaseDataSourceResource (io.seata.rm)
    AbstractDataSourceProxyXA (io.seata.rm.datasource.xa)
        DataSourceProxyXA (io.seata.rm.datasource.xa)
        DataSourceProxyXANative (io.seata.rm.datasource.xa)
AbstractDataSourceProxy (io.seata.rm.datasource)
    DataSourceProxy (io.seata.rm.datasource)

以上是关于多数据源下Seata分布式事务出现的问题和解决方法的主要内容,如果未能解决你的问题,请参考以下文章

多数据源下Seata分布式事务出现的问题和解决方法

多数据源下Seata分布式事务出现的问题和解决方法

架构设计 | 基于Seata中间件,微服务模式下事务管理

Spring Cloud同步场景分布式事务怎样做?试试Seata

Seata分布式事务落地解决方案

Seata分布式事务落地解决方案