微服务 - 分布式事务
Posted cjunn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务 - 分布式事务相关的知识,希望对你有一定的参考价值。
分布式事务是什么
事务是什么
是数据库操作的最小工作单元,是作为单个逻辑工作单元执行的一系列操作;这些操作作为一个整体一起向系统提交,要么都执行、要么都不执行;事务是一组不可再分割的操作集合,简而言之就是当事务正常执行那么该组操作全部执行,如果事务未正常执行则该组操作全部回退到初始状态。
分布式是什么
将复杂的业务拆分个多个子业务分别部署在不同机器上,多个子系统相互协作完成业务系统的流程工程,该整体的系统则称为分布式系统。
分布式事务
根据以上两点可以知道分布式事务即是在一组子系统协作操作这些操作则做为一个整体,要么都执行,要么全都不执行。
举个例子:假设生成订单分别由机器A上的库存服务和机器B上的费用服务以及机器C上的订单服务共同协调完成,当费用服务出现异常时那么库存服务和费用服务及订单服务对数据的操作需要还原到先前的状态。
分布式事务一致性
为了解决这种分布式事务一致性问题,提出了很多典型的协议和算法,比较著名的是二阶段提交协议,三阶段提交协议。
在分布式系统中,各个节点之间在物理上相互独立,通过网络进行沟通和协调。由于存在事务机制,可以保证每个独立节点上的数据操作可以满足ACID。但是,相互独立的节点之间无法准确地知道其他节点的事务执行情况。所以从理论上来讲,两台机器无法达到一致的状态。如果想让分布式部署的多台机器中的数据保持一致性,那么就要保证在所有节点数据的写操作,要么全部都执行,要么全部都不执行。但是,一台机器在执行本地事务的时候无法知道其他机器中的本地事务的执行结果,所以它也就不知道本次事务到底应该commit还是rollback。所以,常规的解决办法就是引入一个"协调者"的组件来统一调度所有分布式节点的执行。
2PC(Two-phaseCommit)
二阶段提交
二阶段提交的算法思路可以概括为: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。
二阶段是指: 第一阶段 - 请求阶段(表决阶段) 第二阶段 - 提交阶段(执行阶段)
(1) 请求阶段(表决):
事务协调者通知每个参与者准备提交或取消事务,然后进入表决过程,参与者要么在本地执行事务,写本地的redo和undo日志,但不提交,到达一种"万事俱备,只欠东风"的状态。请求阶段,参与者将告知协调者自己的决策: 同意(事务参与者本地作业执行成功)或取消(本地作业执行故障)
(2) 提交阶段(执行):
在该阶段,写调整将基于第一个阶段的投票结果进行决策: 提交或取消
当且仅当所有的参与者同意提交事务,协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务
参与者在接收到协调者发来的消息后将执行响应的操作。
两阶段提交的缺点
- 同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。
当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。 - 单点故障。由于协调者的重要性,一旦协调者发生故障。
参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题) - 数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。
而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。
两阶段提交无法解决的问题
当协调者出错,同时参与者也出错时,两阶段无法保证事务执行的完整性。
考虑协调者在发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。
那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。
XA是啥?
XA是由X/Open组织提出的分布式事务的架构(或者叫协议)。XA架构主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。也就是说,在基于XA的一个事务中,我们可以针对多个资源进行事务管理,例如一个系统访问多个数据库,或即访问数据库、又访问像消息中间件这样的资源。这样我们就能够实现在多个数据库和消息中间件直接实现全部提交、或全部取消的事务。XA规范不是java的规范,而是一种通用的规范,目前各种数据库、以及很多消息中间件都支持XA规范。
JTA是满足XA规范的、用于Java开发的规范。所以,当我们说,使用JTA实现分布式事务的时候,其实就是说,使用JTA规范,实现系统内多个数据库、消息中间件等资源的事务。
JTA(Java Transaction API),是J2EE的编程接口规范,它是XA协议的JAVA实现。它主要定义了:
- 一个事务管理器的接口javax.transaction.TransactionManager,定义了有关事务的开始、提交、撤回等>操作。
- 一个满足XA规范的资源定义接口javax.transaction.xa.XAResource,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource接口,并实现该接口定义的两阶段提交相关的接口。如果我们有一个应用,它使用JTA接口实现事务,应用在运行的时候,就需要一个实现JTA的容器,一般情况下,这是一个J2EE容器,像JBoss,Websphere等应用服务器。但是,也有一些独立的框架实现了JTA,例如 Atomikos, bitronix 都提供了jar包方式的JTA实现框架。这样我们就能够在Tomcat或者Jetty之类的服务器上运行使用JTA实现事务的应用系统。在上面的本地事务和外部事务的区别中说到,JTA事务是外部事务,可以用来实现对多个资源的事务性。它正是通过每个资源实现的XAResource来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end(), forget(), isSameRM(), prepare()等等。光从这些接口就能够想象JTA在实现两阶段事务的复杂性。
JTA与JDBC:
JTA事务比JDBC事务更强大。一个JTA事务可以有多个参与者,而一个JDBC事务则被限定在一个单一的数据库连接。下列任一个Java平台的组件都可以参与到一个JTA事务中:JDBC连接、JDO PersistenceManager 对象、JMS 队列、JMS 主题、企业JavaBeans(EJB)、一个用J2EE Connector Architecture 规范编译的资源分配器。
atomikos
Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。
底层实现了:
- xa_open,xa_close:建立和关闭与资源管理器的连接。
- xa_start,xa_end:开始和结束一个本地事务。
- xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。
- xa_recover:回滚一个已进行预提交的事务。
- ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。
- ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。
关键在于第三点,只有第一阶段所有事务xa_prepare成功后,第二阶段才允许所有事务进行xa_commit,否则则将进行xa_rollback,
atomikos+druid实战
maven.pom
<dependencies>
<!-- spring boot begin -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
<exclusions>
<!-- 排除spring boot默认使用的tomcat,使用jetty -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- spring boot end -->
<!-- alibaba database pool begin -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!-- alibaba database pool end -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.42</version>
</dependency>
</dependencies>
spring boot yml文件
#日志配置,此处使用默认日志
#logging:
#config: classpath:log4j2.yml
# spring
spring:
profiles:
active:
- test
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
druid:
# WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
web-stat-filter:
enabled: true
urlPattern:
exclusions:
sessionStatMaxCount:
sessionStatEnable:
principalSessionName:
principalCookieName:
profileEnable:
# StatViewServlet配置,说明请参考Druid Wiki,配置_StatViewServlet配置
stat-view-servlet:
enabled: true
urlPattern:
resetEnable: false
loginUsername: admin
loginPassword: 123456
allow: 127.0.0.1
deny:
# Spring监控配置,说明请参考Druid Github Wiki,配置_Druid和Spring关联监控配置
# Spring监控AOP切入点,如x.y.z.service.*,配置多个英文逗号分隔
aop-patterns:
- com.chint.springboot.atomikos.*.service.**
systemDB:
name: systemDB
url: jdbc:mysql://localhost:3306/test_sys
username: root
password: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
businessDB:
name: businessDB
url: jdbc:mysql://localhost:3306/test_bus
username: root
password: root
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
#jta相关参数配置
jta:
log-dir: classpath:tx-logs
transaction-manager-id: txManager
Data Source Config
package com.zzmx.springboot.atomikos.config;
import java.util.Properties;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
public class DataSourceConfig {
@Bean(name = "systemDataSource")
@Primary
@Autowired
public DataSource systemDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.systemDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("systemDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Autowired
@Bean(name = "businessDataSource")
public AtomikosDataSourceBean businessDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.businessDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("businessDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Bean("sysJdbcTemplate")
public JdbcTemplate sysJdbcTemplate(@Qualifier("systemDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
@Bean("busJdbcTemplate")
public JdbcTemplate busJdbcTemplate(@Qualifier("businessDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
private Properties build(Environment env, String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
prop.put("timeBetweenEvictionRunsMillis",
env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
prop.put("filters", env.getProperty(prefix + "filters"));
return prop;
}
}
Transaction Manager Config
package com.zzmx.springboot.atomikos.config;
import java.util.Properties;
import javax.transaction.UserTransaction;
import org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.transaction.jta.JtaTransactionManager;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
@Configuration
@ComponentScan
@EnableTransactionManagement
public class TransactionManagerConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public UserTransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public JtaTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());
return manager;
}
@Bean(name = "transactionInterceptor")
public TransactionInterceptor transactionInterceptor(PlatformTransactionManager platformTransactionManager) {
TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
// 事物管理器
transactionInterceptor.setTransactionManager(platformTransactionManager);
Properties transactionAttributes = new Properties();
// test
transactionAttributes.setProperty("test*", "PROPAGATION_REQUIRED,-Throwable");
// 新增
transactionAttributes.setProperty("insert*", "PROPAGATION_REQUIRED,-Throwable");
// 修改
transactionAttributes.setProperty("update*", "PROPAGATION_REQUIRED,-Throwable");
// 删除
transactionAttributes.setProperty("delete*", "PROPAGATION_REQUIRED,-Throwable");
// 查询
transactionAttributes.setProperty("select*", "PROPAGATION_REQUIRED,-Throwable,readOnly");
transactionInterceptor.setTransactionAttributes(transactionAttributes);
return transactionInterceptor;
}
// 代理到ServiceImpl的Bean
@Bean
public BeanNameAutoProxyCreator transactionAutoProxy() {
BeanNameAutoProxyCreator transactionAutoProxy = new BeanNameAutoProxyCreator();
transactionAutoProxy.setProxyTargetClass(true);
transactionAutoProxy.setBeanNames("*ServiceImpl");
transactionAutoProxy.setInterceptorNames("transactionInterceptor");
return transactionAutoProxy;
}
}
Druid Monitor Config
package com.zzmx.springboot.atomikos.config;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
@Configuration
public class DruidMonitorConfig {
@Bean
public ServletRegistrationBean druidServlet() {
ServletRegistrationBean reg = new ServletRegistrationBean();
reg.setServlet(new StatViewServlet());
reg.addUrlMappings("/druid/*");
reg.addInitParameter("loginUsername", "admin");
reg.addInitParameter("loginPassword", "123456");
return reg;
}
@Bean
public FilterRegistrationBean filterRegistrationBean() {
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
filterRegistrationBean.setFilter(new WebStatFilter());
filterRegistrationBean.addUrlPatterns("/*");
filterRegistrationBean.addInitParameter("exclusions",
"*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
filterRegistrationBean.addInitParameter("profileEnable", "true");
filterRegistrationBean.addInitParameter("principalCookieName", "USER_COOKIE");
filterRegistrationBean.addInitParameter("principalSessionName", "USER_SESSION");
return filterRegistrationBean;
}
}
测试运行
如下结果会自动回滚
@Transactional
@RequestMapping("")
public void test() {
System.out.println("begin.....");
sysJdbcTemplate.execute("insert into sys_a(id) values(1)");
int i=10/0;
busJdbcTemplate.execute("insert into bus_b(id) values(2)");
System.out.println("end.....");
}
以上是关于微服务 - 分布式事务的主要内容,如果未能解决你的问题,请参考以下文章