springboot整合动态多数据源+分布式事务(亲测可用)
Posted itliyh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot整合动态多数据源+分布式事务(亲测可用)相关的知识,希望对你有一定的参考价值。
1.导入相关的依赖
<!-- mysql驱动包 这里请使用6.0.6版本的mysql,版本高了会报错--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> <!--atomikos分布式事务--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!--阿里数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <!--常用工具类 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency>
2.相关的yml配置
datasource: type: com.alibaba.druid.pool.xa.DruidXADataSource driverClassName: com.mysql.cj.jdbc.Driver druid: # 主库数据源 master: url: jdbc:mysql://localhost:3306/saas_master?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false username: root password: # 从库数据源 slave: # 从数据源开关/默认关闭 open: true type: com.alibaba.druid.pool.xa.DruidXADataSource driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/saas_slave?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false username: root password:
3利用aop原理来动态 切换数据源,相关数据源的处理网上的基本差不多,本人也是参考了很多博主得来的,至于哪些博主得就不太记得了,将就着看吧
package com.zt.common.annotation; import com.zt.common.enums.DataSourceType; import java.lang.annotation.*; /** * 自定义多数据源切换注解 * * @author lyh */ @Target({ ElementType.METHOD, ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface DataSource { /** * 切换数据源名称 */ public DataSourceType value() default DataSourceType.MASTER; }
package com.zt.common.aspect; import com.zt.common.annotation.DataSource; import com.zt.common.datasource.DynamicDataSourceContextHolder; import com.zt.common.utils.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * 多数据源处理 * * @author lyh */ @Aspect @Order(1) @Component public class DataSourceAspect { protected Logger logger = LoggerFactory.getLogger(getClass()); @Pointcut("@annotation(com.zt.common.annotation.DataSource)" + "|| @within(com.zt.common.annotation.DataSource)") public void dsPointCut() { } @Around("dsPointCut()") public Object around(ProceedingJoinPoint point) throws Throwable { DataSource dataSource = getDataSource(point); if (StringUtils.isNotNull(dataSource)) { DynamicDataSourceContextHolder.setDataSourceType(dataSource.value().name()); } try { return point.proceed(); } finally { // 销毁数据源 在执行方法之后 DynamicDataSourceContextHolder.clearDataSourceType(); } } /** * 获取需要切换的数据源 */ public DataSource getDataSource(ProceedingJoinPoint point) { MethodSignature signature = (MethodSignature) point.getSignature(); Class<? extends Object> targetClass = point.getTarget().getClass(); DataSource targetDataSource = targetClass.getAnnotation(DataSource.class); if (StringUtils.isNotNull(targetDataSource)) { return targetDataSource; } else { Method method = signature.getMethod(); DataSource dataSource = method.getAnnotation(DataSource.class); return dataSource; } } }
package com.zt.common.datasource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; /** * 数据源切换处理 * * @author */ public class DynamicDataSourceContextHolder { public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class); /** * 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本, * 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。 */ private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); /** * 也就是所谓的数据库别名 * 管理所有的数据源id; * 主要是为了判断数据源是否存在; */ public static List<String> dataSourceIds = new ArrayList<String>(); /** * 设置数据源的变量 */ public static void setDataSourceType(String dsType) { log.info("切换到{}数据源", dsType); CONTEXT_HOLDER.set(dsType); } /** * 获得数据源的变量 */ public static String getDateSoureType() { return CONTEXT_HOLDER.get(); } /** * 清空数据源变量 */ public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } /** * 判断指定DataSrouce当前是否存在 * * @param dataSourceId * @return */ public static boolean containsDataSource(String dataSourceId){ return dataSourceIds.contains(dataSourceId); } }
package com.zt.common.datasource; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.Map; /** * 动态数据源 * * @author */ public class DynamicDataSource extends AbstractRoutingDataSource { public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) { super.setDefaultTargetDataSource(defaultTargetDataSource); super.setTargetDataSources(targetDataSources); super.afterPropertiesSet(); } /** * 主要是实现本方法: * 而此方法只需要返回一个数据库的名称即可,所以我们核心的是有一个类来管理数据源的线程池,这个类才是动态数据源的核心处理类。 * @return */ @Override protected Object determineCurrentLookupKey() { /** * DynamicDataSourceContextHolder代码中使用setDataSourceType * 设置当前的数据源,在路由类中使用getDataSourceType进行获取, * 交给AbstractRoutingDataSource进行注入使用. */ return DynamicDataSourceContextHolder.getDateSoureType(); } }
package com.zt.common.config; import com.zt.common.datasource.DynamicDataSource; import com.zt.common.enums.DataSourceType; import com.zt.common.interceptor.PrepareInterceptor; import com.zt.common.transaction.MultiDataSourceTransactionFactory; import com.zt.common.transaction.PackagesSqlSessionFactoryBean; import org.apache.ibatis.plugin.Interceptor; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.boot.autoconfigure.SpringBootVFS; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; import org.springframework.core.env.Environment; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.lang.Nullable; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * druid 配置多数据源 * * @author lyh */ @Configuration @EnableTransactionManagement //开启事务 //@MapperScan("com.zt.*.mapper") @Import({PrepareInterceptor.class}) public class DruidMutilConfig { @Autowired PrepareInterceptor prepareInterceptor; @Bean(name = "masterDataSource") public DataSource masterDataSource(Environment env) { String sourceName = "master"; Properties prop = build(env, "spring.datasource.druid.master."); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); //druid的数据库驱动换成xa的 xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); xaDataSource.setUniqueResourceName(sourceName); xaDataSource.setPoolSize(5); xaDataSource.setXaProperties(prop); return xaDataSource; } @Bean(name = "slaveDataSource") public DataSource slaveDataSource(Environment env) { String sourceName = "slave"; Properties prop = build(env, "spring.datasource.druid.slave."); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); //druid的数据库驱动换成xa的 xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); xaDataSource.setUniqueResourceName(sourceName); xaDataSource.setPoolSize(5); xaDataSource.setXaProperties(prop); return xaDataSource; } private Properties build(Environment env, String prefix) { Properties prop = new Properties(); prop.put("url", env.getProperty(prefix + "url")); prop.put("username", env.getProperty(prefix + "username")); prop.put("password", env.getProperty(prefix + "password")); prop.put("driverClassName", env.getProperty(prefix + "driverClassName", "")); //这里只设置了简单的几个属性,如果想做更多的配置可以继续往下添加即可 return prop; } /** * 动态数据源,在这继续添加 DataSource Bean */ @Bean(name = "dynamicDataSource") @Primary public DynamicDataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Nullable @Qualifier("slaveDataSource") DataSource slaveDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource); if (slaveDataSource != null){ targetDataSources.put(DataSourceType.SLAVE.name(), slaveDataSource); } // 还有数据源,在targetDataSources中继续添加 return new DynamicDataSource(masterDataSource, targetDataSources); } @Bean(name = "sqlSessionFactory") @Primary public SqlSessionFactory sqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource) throws Exception { //参照的别人的代码说需要将会话工厂改成mybatis-plus的sql会话工厂, //经测试发现使用mybatis的会话工厂也可以运行,不会报错 // MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean(); //使用了PackagesSqlSessionFactoryBean继承SqlSessionFactoryBean,重写了配置别名的方法 PackagesSqlSessionFactoryBean bean = new PackagesSqlSessionFactoryBean(); bean.setPlugins(new Interceptor[]{prepareInterceptor}); bean.setDataSource(dataSource); //设置多数据源分布式事务 bean.setTransactionFactory(new MultiDataSourceTransactionFactory()); bean.setVfs(SpringBootVFS.class); bean.setTypeAliasesPackage("com.zt");//通配符设置包别名 bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/**/*Mapper.xml"));// 扫描指定目录的xml return bean.getObject(); } @Bean(name = "sqlSessionTemplate") @Primary public SqlSessionTemplate sqlSessionTemplate( @Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory); } }
4.事务相关的处理
package com.zt.common.transaction; import com.alibaba.druid.support.logging.Log; import com.alibaba.druid.support.logging.LogFactory; import com.zt.common.datasource.DynamicDataSourceContextHolder; import org.apache.ibatis.transaction.Transaction; import org.springframework.jdbc.CannotGetJdbcConnectionException; import org.springframework.jdbc.datasource.DataSourceUtils; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.apache.commons.lang3.Validate.notNull; /** * <P>多数据源切换,支持事务</P> * <P>多数据源事务管理器是:根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换</P> * @author lyh */ public class MultiDataSourceTransaction implements Transaction { private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class); private final DataSource dataSource; private Connection mainConnection; private String mainDatabaseIdentification; private ConcurrentMap<String, Connection> otherConnectionMap; private boolean isConnectionTransactional; private boolean autoCommit; public MultiDataSourceTransaction(DataSource dataSource) { notNull(dataSource, "No DataSource specified"); this.dataSource = dataSource; otherConnectionMap = new ConcurrentHashMap<>(); mainDatabaseIdentification= DynamicDataSourceContextHolder.getDateSoureType(); } /** * 开启事务处理方法 */ @Override public Connection getConnection() throws SQLException { String databaseIdentification = DynamicDataSourceContextHolder.getDateSoureType(); if (null==databaseIdentification||databaseIdentification.equals(mainDatabaseIdentification)) { if (mainConnection != null) return mainConnection; else { openMainConnection(); mainDatabaseIdentification =databaseIdentification; return mainConnection; } } else { if (!otherConnectionMap.containsKey(databaseIdentification)) { try { Connection conn = dataSource.getConnection(); otherConnectionMap.put(databaseIdentification, conn); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex); } } return otherConnectionMap.get(databaseIdentification); } } private void openMainConnection() throws SQLException { this.mainConnection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.mainConnection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource); if (LOGGER.isDebugEnabled()) { LOGGER.debug( "JDBC Connection [" + this.mainConnection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } } /** * 提交处理方法 */ @Override public void commit() throws SQLException { if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.commit(); for (Connection connection : otherConnectionMap.values()) { connection.commit(); } } } /** * 回滚处理方法 */ @Override public void rollback() throws SQLException { if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]"); } this.mainConnection.rollback(); for (Connection connection : otherConnectionMap.values()) { connection.rollback(); } } } /** * 关闭处理方法 */ @Override public void close() throws SQLException { DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource); for (Connection connection : otherConnectionMap.values()) { DataSourceUtils.releaseConnection(connection, this.dataSource); } } @Override public Integer getTimeout() throws SQLException { return null; } }
package com.zt.common.transaction; import org.apache.ibatis.session.TransactionIsolationLevel; import org.apache.ibatis.transaction.Transaction; import org.mybatis.spring.transaction.SpringManagedTransactionFactory; import javax.sql.DataSource; /** * <P>支持Service内多数据源切换的Factory</P> * * @author lyh */ public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory { @Override public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) { return new MultiDataSourceTransaction(dataSource); } }
package com.zt.common.transaction; import org.apache.commons.lang3.StringUtils; import org.mybatis.spring.SqlSessionFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.core.type.classreading.CachingMetadataReaderFactory; import org.springframework.core.type.classreading.MetadataReader; import org.springframework.core.type.classreading.MetadataReaderFactory; import org.springframework.util.ClassUtils; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** 配置typeAliasesPackage支持通配符包路径扫描 * 通过继承重写包路径读取方式来实现支持通配符配置,以前的SqlSessionFactoryBean * 不支持通配符设置包别名,所以重写该方法 * Create by lyh */ public class PackagesSqlSessionFactoryBean extends SqlSessionFactoryBean { private static final Logger logger = LoggerFactory.getLogger(PackagesSqlSessionFactoryBean.class); static final String DEFAULT_RESOURCE_PATTERN = "**/*.class"; @Override public void setTypeAliasesPackage(String typeAliasesPackage) { ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver); typeAliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + ClassUtils.convertClassNameToResourcePath(typeAliasesPackage) + "/" + DEFAULT_RESOURCE_PATTERN; //将加载多个绝对匹配的所有Resource //将首先通过ClassLoader.getResource("META-INF")加载非模式路径部分 //然后进行遍历模式匹配 try { List<String> result = new ArrayList<String>(); Resource[] resources = resolver.getResources(typeAliasesPackage); if(resources != null && resources.length > 0){ MetadataReader metadataReader = null; for(Resource resource : resources){ if(resource.isReadable()){ metadataReader = metadataReaderFactory.getMetadataReader(resource); try { result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName()); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } } if(result.size() > 0) { super.setTypeAliasesPackage(StringUtils.join(result.toArray(), ",")); }else{ logger.warn("参数typeAliasesPackage:"+typeAliasesPackage+",未找到任何包"); } } catch (IOException e) { e.printStackTrace(); } } }
package com.zt.common.transaction; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.jta.JtaTransactionManager; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; /** 分布式事务管理器 * 多数据源操作发生异常时,让多数据源的事务进行同步回滚 * Create by lyh */ @Configuration public class XATransactionManagerConfig { @Bean(name = "userTransaction") public UserTransaction userTransaction() throws Throwable { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; } @Bean(name = "atomikosTransactionManager") public TransactionManager atomikosTransactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } @Bean(name = "transactionManager") @DependsOn({ "userTransaction", "atomikosTransactionManager" }) public PlatformTransactionManager transactionManager() throws Throwable { return new JtaTransactionManager(userTransaction(),atomikosTransactionManager()); } }
package com.zt.common.transaction;
import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.apache.commons.lang3.Validate.notNull;
/**
* <P>多数据源切换,支持事务</P>
* <P>多数据源事务管理器是:根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换</P>
* @author lyh
*/
public class MultiDataSourceTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);
private final DataSource dataSource;
private Connection mainConnection;
private String mainDatabaseIdentification;
private ConcurrentMap<String, Connection> otherConnectionMap;
private boolean isConnectionTransactional;
private boolean autoCommit;
public MultiDataSourceTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
otherConnectionMap = new ConcurrentHashMap<>();
mainDatabaseIdentification= DynamicDataSourceContextHolder.getDateSoureType();
}
/**
* 开启事务处理方法
*/
@Override
public Connection getConnection() throws SQLException {
String databaseIdentification = DynamicDataSourceContextHolder.getDateSoureType();
if (null==databaseIdentification||databaseIdentification.equals(mainDatabaseIdentification)) {
if (mainConnection != null) return mainConnection;
else {
openMainConnection();
mainDatabaseIdentification =databaseIdentification;
return mainConnection;
}
} else {
if (!otherConnectionMap.containsKey(databaseIdentification)) {
try {
Connection conn = dataSource.getConnection();
otherConnectionMap.put(databaseIdentification, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
return otherConnectionMap.get(databaseIdentification);
}
}
private void openMainConnection() throws SQLException {
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.mainConnection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
/**
* 提交处理方法
*/
@Override
public void commit() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.commit();
for (Connection connection : otherConnectionMap.values()) {
connection.commit();
}
}
}
/**
* 回滚处理方法
*/
@Override
public void rollback() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.rollback();
for (Connection connection : otherConnectionMap.values()) {
connection.rollback();
}
}
}
/**
* 关闭处理方法
*/
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
for (Connection connection : otherConnectionMap.values()) {
DataSourceUtils.releaseConnection(connection, this.dataSource);
}
}
@Override
public Integer getTimeout() throws SQLException {
return null;
}
}
以上是关于springboot整合动态多数据源+分布式事务(亲测可用)的主要内容,如果未能解决你的问题,请参考以下文章