多数据源下Seata分布式事务出现的问题和解决方法
Posted JF Coder
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多数据源下Seata分布式事务出现的问题和解决方法相关的知识,希望对你有一定的参考价值。
什么是Seata
推荐使用AT模式
Seata官方文档
整体机制
两阶段提交协议的演变:
-
一阶段:业务数据和回滚日志记录在同一个本地事务中提交
(本地数据库先保存,并向undo_log表写入日志),释放本地锁和连接资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿(回滚时-读取undo_log表回滚回初始状态)
多数据源遇到的问题
在Seata1.3.0版本中,数据源自动代理和手动代理一定不能混合使用,否则会导致多层代理,从而导致以下问题:
单数据源情况下:导致分支事务提交时,undo_log本身也被代理,即为 undo_log 生成了 undo_log, 假设为undo_log2,此时undo_log将被当作分支事务来处理;分支事务回滚时,因为undo_log2生成的有问题,在undo_log对应的事务分支回滚时会将业务表关联的undo_log也一起删除,从而导致业务表对应的事务分支回滚时发现undo_log不存在,从而又多生成一条状态为1的undo_log。
多数据源和逻辑数据源被代理情况下:除了单数据源情况下会出现的问题,还可能会造成死锁问题。死锁的原因就是针对undo_log的操作,本该在一个事务中执行的selectfor update 和 delete 操作,被分散在多个事务中执行,导致一个事务在执行完select for update后一直不提交,一个事务在执行delete时一直等待锁,直到超时
解决方案
手动代理数据源
@Bean
public DataSource druidDataSource() {
return new DruidDataSource()
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
Seata内部的自动代理实现
针对DataSource创建一个代理类,在代理类里面基于DataSource获取DataSourceProxy(如果没有就创建),然后调用DataSourceProxy的相关方法。核心逻辑在SeataAutoDataSourceProxyCreator类
类图
ProxyConfig (org.springframework.aop.framework)
ProxyProcessorSupport (org.springframework.aop.framework)
AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
SeataAutoDataSourceProxyCreator (io.seata.spring.annotation.datasource)
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final String[] excludes;
private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
this.excludes = excludes;
setProxyTargetClass(!useJdkProxy);
}
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [{}]", beanName);
}
return new Object[]{advisor};
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return SeataProxy.class.isAssignableFrom(beanClass) ||
DataSourceProxy.class.isAssignableFrom(beanClass) ||
!DataSource.class.isAssignableFrom(beanClass) ||
Arrays.asList(excludes).contains(beanClass.getName());
}
}
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (m != null) {
return m.invoke(dataSourceProxy, args);
} else {
return invocation.proceed();
}
}
@Override
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}
数据源多层代理(案例)
@Bean(name = "writeDataSource")
@ConfigurationProperties(prefix = "spring.datasource.write")
public DataSource writeDataSource() {
return DruidDataSourceBuilder.create().build();
}
@Bean(name = "readDataSource")
@ConfigurationProperties(prefix = "spring.datasource.read")
public DataSource readDataSource() {
return DruidDataSourceBuilder.create().build();
}
@Bean(name = "writeDataSourceProxy")
public DataSourceProxy writeDataSourceProxy(@Qualifier("writeDataSource") DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean("readDataSourceProxy")
public DataSourceProxy readDataSourceProxy(@Qualifier("readDataSource") DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
- 首先我们在配置类里面注入了两个DataSource,分别为: DruidDataSource和DataSourceProxy,
其中DruidDataSource 作为 DataSourceProxy 的 targetDataSource属性,并且DataSourceProxy为使用了@Primary注解声明- 应用默认开启了数据源自动代理,所以在调用DruidDataSource相关方法时,又会为DruidDataSource创建一个对应的数据源代理DataSourceProxy2
当我们在程序中想获取一个Connection时会发生什么?
- 先获取一个DataSource,因为DataSourceProxy为Primary,所以此时拿到的是DataSourceProxy
- 基于DataSource获取一个Connection,即通过DataSourceProxy获取Connection。此时会先调用targetDataSource 即 DruidDataSource 的 getConnection 方法,但因为切面会对DruidDataSource进行拦截,根据步骤2的拦截逻辑可以知道,此时会自动创建一个DataSourceProxy2,然后调用DataSourceProxy2#getConnection,然后再调用DruidDataSource的getConnection方法。最终形成了双层代理, 返回的Connection也是一个双层的ConnectionProxy;
Seata开启动态数据源后,根据上面的代码实例,最终将生成三个代理数据源
1.读写代理数据源两个,主代理数据源
2.根据@Primary指定默认的数据源,默认返回masterSlaveProxy
Seata关于动态数据源的基本类实现
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
/**实例化新的数据源代理,参数: targetDataSource–目标数据源 */
public DataSourceProxy(DataSource targetDataSource) {
this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}
/**实例化新的数据源代理
参数: targetDataSource–目标数据源 resourceGroupId–资源组id*/
public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
if (targetDataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
}
this.targetDataSource = targetDataSource;
init(targetDataSource, resourceGroupId);
}
}
DataSourceProxy
初始化数据连接,获得数据连接对象
private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
tableMetaExcutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
/**
获取普通连接。 返回值: 普通连接 抛出: SQLException–sql异常
*/
public Connection getPlainConnection() throws SQLException {
return targetDataSource.getConnection();
}
Seata代理数据源类关系
BaseDataSourceResource (io.seata.rm)
AbstractDataSourceProxyXA (io.seata.rm.datasource.xa)
DataSourceProxyXA (io.seata.rm.datasource.xa)
DataSourceProxyXANative (io.seata.rm.datasource.xa)
AbstractDataSourceProxy (io.seata.rm.datasource)
DataSourceProxy (io.seata.rm.datasource)
以上是关于多数据源下Seata分布式事务出现的问题和解决方法的主要内容,如果未能解决你的问题,请参考以下文章