分布式事务压缩储备粮

Posted 魏凡缤Java技术文章

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务压缩储备粮相关的知识,希望对你有一定的参考价值。

        开篇先讲了这么个笑话,想阐述的就是,能直接整,最好直接整,说实在的,奉劝各位,分布式事务,能不用就别用,能够通过业务设计化解掉分布式事务的场景,最好化解,这玩意实在没辙才上的,不像Spring上来就要用。讲这些无非就是为了给应付面试的同学梳理下。

        说到分布式事务,首先要知道什么是事务,简单回顾一下事务的四性

        你给我的微信转账100元来举例,如果有张表记录每个用户的存款,是不是你的微信账户需要-100,我的微信账户需要加100,看成两条SQL的update修改你的和修改我的信息即可

        原子性(Atomicity):修改你的和修改我的必须都成功或都不成功,不能减了你的,然后出错没加我的

        一致性(Consistency):你减了100,我必须加100,加了50就不叫一致!!!

        隔离性(Isolation):万一有俩同学都给我转100,你转你的,他转他的不能相互影响

        持久性(Durability):转给我的钱,就写到数据库持久化了,想要回去。得发照片

先阐述问题:

        提到事务对于后端同学来说,如果稍微了解一下,至少能说出来,失败就回滚嘛~,但是在分布式系统里两个微服务,每个微服务再操作对应的数据库,我问你,怎么回滚?怎么滚 !!!一个微服务操作成功了,一个微服务操作失败了!怎么滚?!!回滚!!!,你不可能让一个commit提交的事务回滚吧!!!

有几个不好记住的理论

分布式事务压缩储备粮


CAP理论,C(一致性),A(高可用),P(容错性)。大家记住一句话,必须保证P,分布式集群必须保证容错,那么不是CP,就只能是AP

注册中心来举例:

Zookeeper注重CP,但是效率不如AP

Eureka注重AP,效率高,AP放弃强一致,延伸出BASE理论                                    BASE理论:Basically Available(基本可用)Soft state(软状态) Eventually consistent(最终一致性),将BASE理论用于解决分布式事务,也有人称其为柔性事务


一、TCC的两段式提交  (TRY-CONMIT 或者 TRY-CANCEL)

分布式事务压缩储备粮

        你看这名字顾名思义,尝试提交,或尝试取消

        其中记住这句话XA是一套规范,JTA是Java语言的XA规范下的接口,而Atomikos实现了JTA这个接口

举个简单的例子如果你用100元买了一瓶水, Try阶段:你需要向你的钱包检查是否够100元并锁住这100元,水也是一样的。

如果有一个失败,则进行cancel(释放这100元和这一瓶水),如果cancel失败不论什么失败都进行重试cancel,所以需要保持幂等。

如果都成功,则进行confirm,确认这100元扣,和这一瓶水扣掉库存,如果confirm失败无论什么失败则重试(会依靠活动日志进行重试)

对于TCC来说适合一些:

  • 强隔离性,严格一致性要求的活动业务。

  • 执行时间较短的业务

二、通过消息中间件保证一致性

分布式事务压缩储备粮

1.事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送half消息)

2.MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送的消息

3.然后返回ACK给消息生产者,此时MQ不会触发消息推送事件

4.生产者预发送消息成功后,执行本地事务

5.执行本地事务,执行完成后,发送执行结果给MQ

6.MQ会根据结果删除或者更新消息状态为可发送

7.如果消息状态更新为可发送,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的

8.消费者确认消费消息,如果出现问题,可以重复消费直至业务成功


    还拿上面例子来说,核心是把大事务转变为小事务。还是举上面用100元去买一瓶水的例子。

1.当你扣钱的时候,你需要在你扣钱的服务器上新增加一个本地消息表,你需要把你扣钱和写入减去水的库存到本地消息表放入同一个事务。

2.这个时候有个定时任务去轮询这个本地事务表,把没有发送的消息,扔给商品库存服务器,叫他减去水的库存,到达商品服务器之后这个时候得先写入这个服务器的事务表,然后进行扣减,扣减成功后,更新事务表中的状态。

3.商品服务器通过定时任务扫描消息表或者直接通知扣钱服务器,扣钱服务器本地消息表进行状态更新。

4.针对一些异常情况,定时扫描未成功处理的消息,进行重新发送,在商品服务器接到消息之后,首先判断是否是重复的,如果已经接收,在判断是否执行,如果执行在马上又进行通知事务,如果未执行,需要重新执行需要由业务保证幂等,也就是不会多扣一瓶水。

       本地消息队列是BASE理论,是最终一致模型,适用于对一致性要求不高的。实现这个模型时需要注意重试的幂等。

==============上面是概念,下面是方案=============

分布式事务,咱们主要介绍三种解决方案

1.TCC两段式提交的Atomikos框架适合多数据源的传统应用

例子还用之前的SSM整合,弄个修改分数,需要给另外的数据库tests2中t_money加钱,也就是说,给学生加多少分,就需要给t_money中该用户加多少钱

1.先将test2的SQL贴出来

//挺简单的一张表就三个字段,id自增,name是t_score的name,money是本次分数对应的钱数
DROP TABLE IF EXISTS `t_money`;CREATE TABLE `t_money` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL, `money` int(255) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

2.直接改造前前篇的SSM整合案例即可,在之前pom中增加

2.1首先pom坐标

 <!--JTA atomikos--> <dependency> <groupId>javax.transaction</groupId> <artifactId>jta</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>atomikos-util</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>transactions</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-jta</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-jdbc</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-api</artifactId> <version>4.0.6</version> </dependency>

        2.2其次是修改后的applicationContext.xml,事务都配置在这个文件中

        jdbc.properties 演示用的两个数据源 tests和tests2两个数据库

jdbc.url=jdbc:mysql://127.0.0.1:3306/tests?useUnicode=true&characterEncoding=utf8&autoReconnect=true
jdbc.log.url=jdbc:mysql://127.0.0.1:3306/tests2?useUnicode=true&characterEncoding=utf8&autoReconnect=true


<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

<!--加载配置文件--> <context:property-placeholder location="classpath:/jdbc.properties" ignore-unresolvable="true"/>
<!--1.注解扫描: 只扫描service包。--> <context:component-scan base-package="cn.wfb.service"/>
<!--2. 创建连接池--> <!--数据源基础配置,这里数据源管理交给AtomikosDataSourceBean--> <bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" abstract="true"> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/> <property name="poolSize" value="10"/> <property name="minPoolSize" value="10"/> <property name="maxPoolSize" value="30"/> <property name="borrowConnectionTimeout" value="60"/> <property name="reapTimeout" value="20"/> <property name="maxIdleTime" value="60"/> <property name="maintenanceInterval" value="60"/> <!--这些配置很多自己可以百度查一下,这个是连接测试用--> <property name="testQuery"> <value>SELECT 1</value> </property> </bean>
<!--这里我们配置两个数据源 --> <!-- tests数据库 --> <bean id="testsSource" parent="abstractXADataSource"> <property name="uniqueResourceName"> <value>testsSource</value> </property> <!--数据库驱动--> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/> <property name="xaProperties"> <props> <prop key="URL">${jdbc.url}</prop> <prop key="user">root</prop> <prop key="password">root</prop> </props> </property> </bean>
<!-- tests2数据库 --> <bean id="tests2Source" parent="abstractXADataSource"> <property name="uniqueResourceName"> <value>tests2Source</value> </property> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/> <property name="xaProperties"> <props> <prop key="URL">${jdbc.log.url}</prop> <prop key="user">root</prop> <prop key="password">root</prop> </props> </property> </bean>
<!--分数的SessionFactoryBean--> <bean id="sqlSessionFactoryBeanScore" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="typeAliasesPackage" value="cn.wfb.entity" /> <property name="dataSource" ref="testsSource"/> </bean>
<!--钱的SessionFactoryBean--> <bean id="sqlSessionFactoryBeanMoney" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="typeAliasesPackage" value="cn.wfb.entity" /> <property name="dataSource" ref="tests2Source"/> </bean>
<!--4. 扫描dao的接口所在包,自动对该包下所有的dao接口自动生成代理对象--> <!--包扫描,两个库,entity可以放一个包下,接口层的包最好分开,注入不同的SessionFactoryBean--> <bean id="mapperScannerConfigurerScore" class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="cn.wfb.dao.score" /> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryBeanScore" /> </bean>
<bean id="mapperScannerConfigurerMoney" class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="cn.wfb.dao.money" /> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryBeanMoney" /> </bean>

<!--5. 事务管理器 --> <!--JTA事务管理器--> <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager"> <ref bean="atomikosTransactionManager"/> </property> <property name="userTransaction"> <ref bean="atomikosLocalTransaction"/> </property> <property name="allowCustomIsolationLevels" value="true"/> </bean>
<!--配置事务管理器atomikos事务管理器--> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown" value="false"/> </bean> <!-- 本地事务管理器--> <bean id="atomikosLocalTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="300000"/> </bean>
<!--6. 开启事务控制的注解支持 --> <tx:annotation-driven transaction-manager="springTransactionManager" proxy-target-class="true"/></beans>


ScoreServiceImpl中增加修改方法,Controller的就不写了,这个应该会了。不会的来问我吧

 //直接加上注解,说明用的是springTransactionManager JTA事务管理器,可以看到抛了异常,两个不同的数据源回滚了 
@Override @Transactional(value = "springTransactionManager",rollbackFor = Exception.class) public void update(Integer id, Integer score) {
TScore tScore = scoreMapper.selectByPrimaryKey(id);
//花钱买分的话,先修改分数 tScore.setFraction(tScore.getFraction() + score); scoreMapper.updateByPrimaryKeySelective(tScore);
//再修改钱,每修改一次,都新增一次作为记录 TMoney tMoney = new TMoney(); tMoney.setName(tScore.getName()); tMoney.setMoney(score); moneyMapper.insert(tMoney);
int i = 1/0; //抛异常测试多数据源能否回滚 }


总结说一下的话,其实就是看Spring的配置文件,两份SqlSessionFactoryBean中配置两个数据源,然后通过JtaTransactionManager统一管,JtaTransactionManager管理着atomikosTransactionManager和atomikosLocalTransaction

分布式事务压缩储备粮


2.LCN分布式事务框架适合分布式系统多微服务

TX-LCN定位于一款事务协调性框架,框架其本身并不操作事务,而是基于对事务的协调从而达到事务一致性的效果。

还是继续直接整代码

分布式事务压缩储备粮


四个微服务,service_tm(管理事务微服务),service_eureka (注册中心),service_a(事务发起方)和service_b(参与方b)两个服务,演示a远程调用b,都向t_score表新增一条数据

父工程的pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>cn.wfb</groupId> <artifactId>lcn_demo</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>service_a</module> <module>service_b</module> <module>service_tm</module> <module>service_eureka</module> </modules>
<properties> <!-- project -setting --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version>
<codingapi.txlcn.version>5.0.2.RELEASE</codingapi.txlcn.version> <springcloud.version>Greenwich.RELEASE</springcloud.version> <lombok.version>1.18.0</lombok.version> </properties>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> </parent>
<dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <!--springCloud坐标--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${springcloud.version}</version> <type>pom</type> <scope>import</scope> </dependency>

<!--以下三个坐标关于tx-lcn--> <dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-tc</artifactId> <version>${codingapi.txlcn.version}</version> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-tm</artifactId> <version>${codingapi.txlcn.version}</version> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-txmsg-netty</artifactId> <version>${codingapi.txlcn.version}</version> </dependency>
</dependencies> </dependencyManagement>
<build> <finalName>app</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>

一、搭建service_tm事务管理微服务

pom坐标

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lcn_demo</artifactId> <groupId>cn.wfb</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>service_tm</artifactId>
<dependencies> <!--事务管理微服务坐标--> <dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-tm</artifactId> </dependency> </dependencies></project>

application.properties

################### 这个是启动本服务的配置文件,其它的application-xxx.properties 是开发者的个性化配置,不用关心。# 你可以在 https://txlcn.org/zh-cn/docs/setting/manager.html 看到所有的个性化配置#################
#数据库相关配置,替换自己的信息 7970是后端控制服务的端口号spring.application.name=TransactionManagerserver.port=7970 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.url=jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8&serverTimezone=Asia/Shanghaispring.datasource.username=rootspring.datasource.password=rootspring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialectspring.jpa.hibernate.ddl-auto=update
mybatis.configuration.map-underscore-to-camel-case=truemybatis.configuration.use-generated-keys=true
#txManager微服务需要Redis保存事务相关数据spring.redis.host=localhostspring.redis.port=6379spring.redis.password=
#访问http://localhost:7970/admin/index.html#/login 的密码可以指定tx-lcn.manager.admin-key=123456

TransactionManagerApplication入口类

package cn.wfb;
import com.codingapi.txlcn.tm.config.EnableTransactionManagerServer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication@EnableTransactionManagerServerpublic class TransactionManagerApplication {
public static void main(String[] args) { SpringApplication.run(TransactionManagerApplication.class, args); }}

测试TxManager搭建是否ok可以通过访问其后台管理界面,这里可以看到被管理事务的微服务

TxManager系统后台:http://127.0.0.1:7970/admin/index.html#/login   密码 123456


二、service_eureka (注册中心)

pom坐标

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lcn_demo</artifactId> <groupId>cn.wfb</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>service_eureka</artifactId>
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies></project>

application.yml

server: port: 7100spring: application: name: txlcn-demo-registryeureka: client: service-url: defaultZone: http://127.0.0.1:7100/eureka register-with-eureka: false fetch-registry: false

DemoRegistryApp入口类

package cn.wfb;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication@EnableEurekaServerpublic class DemoRegistryApp { public static void main(String[] args) { SpringApplication.run(DemoRegistryApp.class, args); }}

测试注册中心访问7100可以看到后台管理界面即可


分布式事务压缩储备粮


三、service_b(远程被调用方)

这里说明一下,entity,mapper,都用之前的SpringBoot整合文章里的素材就ok

pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lcn_demo</artifactId> <groupId>cn.wfb</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>service_b</artifactId>
<dependencies> <!--知道什么是web项目吗?--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<!--jdbc没听说过?--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency>
<!--驱动懂吗?--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
<!-- 直接就是plus 啥?不知道什么叫plus,iphonePlus不知道么!走走走 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.1.2</version> </dependency>

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-tc</artifactId> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-txmsg-netty</artifactId> </dependency> </dependencies></project>

application.yml

server: port: 8082 #设置端口号
spring: #数据库都设置不对,玩啥玩还? datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8 username: root password: root application: name: spring-service-b
## eureka 配置eureka: instance: ip-address: 127.0.0.1 instance-id: ${spring.application.name}:${server.port} prefer-ip-address: true lease-expiration-duration-in-seconds: 10 lease-renewal-interval-in-seconds: 5 client: registry-fetch-interval-seconds: 5 service-url: defaultZone: http://127.0.0.1:7100/eureka
## tx-manager 配置tx-lcn: client: manager-address: 127.0.0.1:8070 ribbon: loadbalancer: dtx: enabled: true

ServiceBApplication入口类

package cn.wfb.demo;
import com.codingapi.txlcn.tc.config.EnableDistributedTransaction;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;
//知道什么叫入口类吗,知道包名吗,用脑子想想,下面注解里包括什么注解@SpringBootApplication@MapperScan("cn.wfb.demo.dao") //这个干嘛用的?!!想想!想想!!行吗@EnableDistributedTransactionpublic class ServiceBApplication {
public static void main(String[] args) { SpringApplication.run(ServiceBApplication.class, args); }}

ScoreController

package cn.wfb.demo.web;
import cn.wfb.demo.service.ScoreService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
//下面这四个注解看不明白?,//拆解一下三个步骤:第一打开浏览器,第二www.baidu.com,第三回车!!!@RestController@RequestMapping("/score")public class ScoreController {
@Autowired private ScoreService scoreService;
@GetMapping("/rpc") public String rpc(@RequestParam("value") String value, HttpServletRequest servletRequest) { return scoreService.rpc(value); }}

ScoreService

package cn.wfb.demo.service;
import cn.wfb.demo.dao.ScoreMapper;import cn.wfb.demo.entity.TScore;import com.codingapi.txlcn.tc.annotation.DTXPropagation;import com.codingapi.txlcn.tc.annotation.TxcTransaction;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;
@Servicepublic class ScoreService {
@Autowired private ScoreMapper scoreMapper;
//TCC分布式事务注解 @TxcTransaction(propagation = DTXPropagation.SUPPORTS) @Transactional public String rpc(String value) { TScore tScore = new TScore(); tScore.setName(value); tScore.setSubject("远程调用后赋值的"); tScore.setFraction(value); scoreMapper.insert(tScore);
//int i = 1/0; 测试远程调用serviceB出现问题,serviceA也会回滚 return "ok-service-b"; }}

四、service_a(事务的发起方微服务)

pom文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>lcn_demo</artifactId> <groupId>cn.wfb</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>service_a</artifactId>
<dependencies> <!--知道什么是web项目吗?--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<!--jdbc没听说过?--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency>
<!--驱动懂吗?--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
<!-- 直接就是plus 啥?不知道什么叫plus,iphonePlus不知道么!走走走 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.1.2</version> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-tc</artifactId> </dependency>
<dependency> <groupId>com.codingapi.txlcn</groupId> <artifactId>txlcn-txmsg-netty</artifactId> </dependency> </dependencies></project>

application.yml

server: port: 8081 #设置端口号
spring: #数据库都设置不对,玩啥玩还? datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8 username: root password: root application: name: spring-service-a
## eureka 配置(这里不再做详细说明百度一下)eureka: instance: ip-address: 127.0.0.1 instance-id: ${spring.application.name}:${server.port} prefer-ip-address: true lease-expiration-duration-in-seconds: 10 lease-renewal-interval-in-seconds: 5 client: registry-fetch-interval-seconds: 5 service-url: defaultZone: http://127.0.0.1:7100/eureka
## tx-manager 配置tx-lcn: client: manager-address: 127.0.0.1:8070 ## 8070是微服务和事务管理微服务沟通端口号 ribbon: loadbalancer: dtx: enabled: true

ServiceAApplication入口类

package cn.wfb.demo;
import com.codingapi.txlcn.tc.config.EnableDistributedTransaction;import org.mybatis.spring.annotation.MapperScan;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Bean;import org.springframework.web.client.RestTemplate;
//知道什么叫入口类吗,知道包名吗,用脑子想想,下面注解里包括什么注解@SpringBootApplication@MapperScan("cn.wfb.demo.dao") //这个干嘛用的?!!想想!想想!!行吗@EnableDistributedTransactionpublic class ServiceAApplication {
public static void main(String[] args) { SpringApplication.run(ServiceAApplication.class, args); }
@Bean public RestTemplate restTemplate() { return new RestTemplate(); }}

ScoreController

package cn.wfb.demo.web;
import cn.wfb.demo.entity.TScore;import cn.wfb.demo.service.ScoreService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController@RequestMapping("/score")public class ScoreController {
@Autowired private ScoreService scoreService;

@RequestMapping("/txlcn") public String sayHello(@RequestParam("value") String value, @RequestParam(value = "ex", required = false) String exFlag) { return scoreService.execute(value, exFlag); }}

ScoreService

package cn.wfb.demo.service;
import cn.wfb.demo.dao.ScoreMapper;import cn.wfb.demo.entity.TScore;import com.codingapi.txlcn.tc.annotation.LcnTransaction;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.web.client.RestTemplate;
import java.util.List;import java.util.Objects;
@Servicepublic class ScoreService {
@Autowired private ScoreMapper scoreMapper;
@Autowired private RestTemplate restTemplate;
@LcnTransaction public String execute(String value, String exFlag) {
//这里通过restTemplate发起远程调用,调用服务B的rpc接口,返回字符串 //dResp = ok-service-b,服务b返回的字符串 String dResp = restTemplate.getForObject("http://127.0.0.1:8082/score/rpc?value=" + value, String.class);
//一切正常新增到数据库 TScore tScore = new TScore(); tScore.setName(value); tScore.setSubject("测试lcn"); tScore.setFraction(value); scoreMapper.insert(tScore);
// 置异常标志,DTX 回滚,测试的时候可以不传exFlag参数,默认会正常 if (Objects.nonNull(exFlag)) { throw new IllegalStateException("by exFlag"); } //正确测试,应该返回ok-service-b > ok-service-a字符串 return dResp + " > " + "ok-service-a"; }}

测试如果一切正常调用接口后,会向数据库新增两条记录,一条是service-a新增,一条是service-b新增

分布式事务压缩储备粮

测试带参数ex=false带上测试 会让service-a抛by exFlag异常,会发现远程微服务service-b也不会向数据库新增记录

分布式事务压缩储备粮

总结一下

    LCN是通过一个管理了事务组的微服务来控制多个微服务的事务

如果一切正常

分布式事务压缩储备粮

如果参与方B出现业务问题,需要回滚

分布式事务压缩储备粮

LCN分布式,就是通过TxManager微服务管理一个事务组,如果有A 远程调用B和C,那么通过A作为事务的发起方,B和C远程调用后,会返回信息给A如果正常,由发起方的A进行提交,如果在A有异常的话,不仅A有回滚,并且可以让B和C的事务也回滚。可以看出和Atomikos不同,LCN是可以进行分布式事务处理,因为可以多服务,Atomikos只能是多数据源,LCN不仅多数据源,并且多微服务


3.RocketMQ事务消息

继续拿转账的例子来说

分布式事务压缩储备粮

系统解耦合从远程调用改成异步发消息,一个微服务操作Bob账户,一个微服务操作Smith账户

分布式事务压缩储备粮

        如果在操作Bob账户的时候,事务回滚了,但是异步消息发送出去了,后果可以想象吧???

在Spring中,我们要实现事务,一般通过@Transactional注解实现。这在引入RocketMQ之前没有问题,但是在引入了RocketMQ之后,如果消息发送之后的业务逻辑处理发生了异常的话,这时候消息已经发送出去了,就会导致业务的问题。

分布式事务压缩储备粮

  1. 生产者向MQServer发送半消息【半消息:会存储进MQ Server,但是被标记为不能投递状态】

  2. 发送半消息成功,生产者实行本地事务

  3. 根据本地事务结果向MQ Server发送二次确认请求

  4. MQ Server根据接受到的消息投递或者丢弃消息

  5. 若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态,MQ服务器将向同一组中的每个生产者发送检查消息,然后继续3,4的操作

  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。


改造上面的lcn_demo即可,去掉lcn相关的功能,因为是发送消息

发送方service-a

pom文件

 <!--去掉lcn,和eureka相关坐标,整合rokectmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>

application.yml

rocketmq: name-server: 127.0.0.1:9876 producer: group: productgroup

ScoreService主要的发送消息逻辑

package cn.wfb.demo.service;
import cn.wfb.demo.entity.TScore;import com.alibaba.fastjson.JSON;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;
import java.util.UUID;
@Servicepublic class ScoreService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public String execute(String value, String exFlag) {
TScore tScore = new TScore(); tScore.setName(value); tScore.setSubject("测试lcn"); tScore.setFraction(value);
// 发送半消息 //第一个参数为txProducerGroup:就是group名称,根据业务自定义 //第二次信息为destination:topic名称 //第三个信息为message:消息体,利用MessageBuilder.withPayload构建 //第四个信息为arg:业务对象,用于处理本地业务
try { //发送Half消息 rocketMQTemplate.sendMessageInTransaction( "test-transactional", "test-topic", MessageBuilder.withPayload( JSON.toJSONString(tScore).getBytes(RemotingHelper.DEFAULT_CHARSET) ).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build(), null ); } catch (Exception e) { e.printStackTrace(); }
return "ok-service-a"; }}

注意下面是发送方创建消息事务监听

package cn.wfb.demo.listener;
import cn.wfb.demo.dao.ScoreMapper;import cn.wfb.demo.entity.TScore;import cn.wfb.demo.service.ScoreService;import com.alibaba.fastjson.JSON;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQHeaders;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;
@RocketMQTransactionListener(txProducerGroup = "test-transactional")public class TransactionalListener implements RocketMQLocalTransactionListener {
@Autowired private ScoreMapper scoreMapper;

/** * 消息发送方执行自身业务操作的方法 * @param message 发送方发送的东西 * @param o 额外的参数 * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { // 消息头 MessageHeaders headers = message.getHeaders(); String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID); System.out.println("消息的id="+transactionalId); // 消息内容 String jsonString = new String((byte[]) message.getPayload()); TScore tScore = JSON.parseObject(jsonString, TScore.class); // arg:sendMessageInTransaction方法的第四个参数,用于处于本地业务 try { //int i = 1/0; // 本地业务代码,保存数据库 // 业务逻辑如果出现异常,之前发送的消息会ROLLBACK scoreMapper.insert(tScore); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { System.out.println("=发消息出异常="); return RocketMQLocalTransactionState.ROLLBACK; } }
/** * 若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态 * MQ Server将向同一组中的每个生产者发送检查消息 * 事务超时,回查方法 * @param message:携带要回查的事务ID */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { try { // 检查 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }}


其实消息接收方也就是service-b正常处理消息就行了,关键还是看上面发送方的service-a的TransactionalListener类处理

比如增加个MessageConsumer监听,收到消息处理逻辑即可

package cn.wfb.demo.listener;
import cn.wfb.demo.entity.TScore;import cn.wfb.demo.service.ScoreService;import com.alibaba.fastjson.JSON;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;
@Component@RocketMQMessageListener(consumerGroup = "test-transactional", topic = "test-topic")public class MessageConsumer implements RocketMQListener<String> {
@Autowired private ScoreService scoreService;
@Override public void onMessage(String message) { System.out.println("接收到消息:="+message); //将Service-a发送来的消息转对象 TScore tScore = JSON.parseObject(message, TScore.class); //保存数据库 scoreService.rpc(tScore.getName()); }}



关于分布式事务,也就总结到这里了,好好看看,主要是概念,领会一下!!!







以上是关于分布式事务压缩储备粮的主要内容,如果未能解决你的问题,请参考以下文章

BottomNavigationView 滞后于片段事务

分布式事务

剑指Offer——知识点储备-数据库基础

关于DataSet事务处理以及SqlDataAdapter四种用法

ActiveMQ分布式事务

分布式事务-seata