分布式事务压缩储备粮
Posted 魏凡缤Java技术文章
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务压缩储备粮相关的知识,希望对你有一定的参考价值。
开篇先讲了这么个笑话,想阐述的就是,能直接整,最好直接整,说实在的,奉劝各位,分布式事务,能不用就别用,能够通过业务设计化解掉分布式事务的场景,最好化解,这玩意实在没辙才上的,不像Spring上来就要用。讲这些无非就是为了给应付面试的同学梳理下。
说到分布式事务,首先要知道什么是事务,简单回顾一下事务的四性
你给我的微信转账100元来举例,如果有张表记录每个用户的存款,是不是你的微信账户需要-100,我的微信账户需要加100,看成两条SQL的update修改你的和修改我的信息即可
原子性(Atomicity):修改你的和修改我的必须都成功或都不成功,不能减了你的,然后出错没加我的
一致性(Consistency):你减了100,我必须加100,加了50就不叫一致!!!
隔离性(Isolation):万一有俩同学都给我转100,你转你的,他转他的不能相互影响
持久性(Durability):转给我的钱,就写到数据库持久化了,想要回去。得发照片
先阐述问题:
提到事务对于后端同学来说,如果稍微了解一下,至少能说出来,失败就回滚嘛~,但是在分布式系统里两个微服务,每个微服务再操作对应的数据库,我问你,怎么回滚?怎么滚 !!!一个微服务操作成功了,一个微服务操作失败了!怎么滚?!!回滚!!!,你不可能让一个commit提交的事务回滚吧!!!
有几个不好记住的理论
CAP理论,C(一致性),A(高可用),P(容错性)。大家记住一句话,必须保证P,分布式集群必须保证容错,那么不是CP,就只能是AP
注册中心来举例:
Zookeeper注重CP,但是效率不如AP
Eureka注重AP,效率高,AP放弃强一致,延伸出BASE理论 BASE理论:Basically Available(基本可用)Soft state(软状态) Eventually consistent(最终一致性),将BASE理论用于解决分布式事务,也有人称其为柔性事务
一、TCC的两段式提交 (TRY-CONMIT 或者 TRY-CANCEL)
你看这名字顾名思义,尝试提交,或尝试取消
其中记住这句话XA是一套规范,JTA是Java语言的XA规范下的接口,而Atomikos实现了JTA这个接口
举个简单的例子如果你用100元买了一瓶水, Try阶段:你需要向你的钱包检查是否够100元并锁住这100元,水也是一样的。
如果有一个失败,则进行cancel(释放这100元和这一瓶水),如果cancel失败不论什么失败都进行重试cancel,所以需要保持幂等。
如果都成功,则进行confirm,确认这100元扣,和这一瓶水扣掉库存,如果confirm失败无论什么失败则重试(会依靠活动日志进行重试)
对于TCC来说适合一些:
强隔离性,严格一致性要求的活动业务。
执行时间较短的业务
二、通过消息中间件保证一致性
1.事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送half消息)
2.MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送
的消息
3.然后返回ACK给消息生产者,此时MQ不会触发消息推送事件
4.生产者预发送消息成功后,执行本地事务
5.执行本地事务,执行完成后,发送执行结果给MQ
6.MQ会根据结果删除或者更新消息状态为可发送
7.如果消息状态更新为可发送
,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的
8.消费者确认消费消息,如果出现问题,可以重复消费直至业务成功
还拿上面例子来说,核心是把大事务转变为小事务。还是举上面用100元去买一瓶水的例子。
1.当你扣钱的时候,你需要在你扣钱的服务器上新增加一个本地消息表,你需要把你扣钱和写入减去水的库存到本地消息表放入同一个事务。
2.这个时候有个定时任务去轮询这个本地事务表,把没有发送的消息,扔给商品库存服务器,叫他减去水的库存,到达商品服务器之后这个时候得先写入这个服务器的事务表,然后进行扣减,扣减成功后,更新事务表中的状态。
3.商品服务器通过定时任务扫描消息表或者直接通知扣钱服务器,扣钱服务器本地消息表进行状态更新。
4.针对一些异常情况,定时扫描未成功处理的消息,进行重新发送,在商品服务器接到消息之后,首先判断是否是重复的,如果已经接收,在判断是否执行,如果执行在马上又进行通知事务,如果未执行,需要重新执行需要由业务保证幂等,也就是不会多扣一瓶水。
本地消息队列是BASE理论,是最终一致模型,适用于对一致性要求不高的。实现这个模型时需要注意重试的幂等。
==============上面是概念,下面是方案=============
分布式事务,咱们主要介绍三种解决方案
1.TCC两段式提交的Atomikos框架适合多数据源的传统应用
例子还用之前的SSM整合,弄个修改分数,需要给另外的数据库tests2中t_money加钱,也就是说,给学生加多少分,就需要给t_money中该用户加多少钱
1.先将test2的SQL贴出来
//挺简单的一张表就三个字段,id自增,name是t_score的name,money是本次分数对应的钱数
DROP TABLE IF EXISTS `t_money`;
CREATE TABLE `t_money` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,
`money` int(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
2.直接改造前前篇的SSM整合案例即可,在之前pom中增加
2.1首先pom坐标
<!--JTA atomikos-->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-util</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-api</artifactId>
<version>4.0.6</version>
</dependency>
2.2其次是修改后的applicationContext.xml,事务都配置在这个文件中
jdbc.properties 演示用的两个数据源 tests和tests2两个数据库
jdbc.url=jdbc:mysql://127.0.0.1:3306/tests?useUnicode=true&characterEncoding=utf8&autoReconnect=true
jdbc.log.url=jdbc:mysql://127.0.0.1:3306/tests2?useUnicode=true&characterEncoding=utf8&autoReconnect=true
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:/jdbc.properties" ignore-unresolvable="true"/>
<!--1.注解扫描: 只扫描service包。-->
<context:component-scan base-package="cn.wfb.service"/>
<!--2. 创建连接池-->
<!--数据源基础配置,这里数据源管理交给AtomikosDataSourceBean-->
<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" abstract="true">
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="poolSize" value="10"/>
<property name="minPoolSize" value="10"/>
<property name="maxPoolSize" value="30"/>
<property name="borrowConnectionTimeout" value="60"/>
<property name="reapTimeout" value="20"/>
<property name="maxIdleTime" value="60"/>
<property name="maintenanceInterval" value="60"/>
<!--这些配置很多自己可以百度查一下,这个是连接测试用-->
<property name="testQuery">
<value>SELECT 1</value>
</property>
</bean>
<!--这里我们配置两个数据源 -->
<!-- tests数据库 -->
<bean id="testsSource" parent="abstractXADataSource">
<property name="uniqueResourceName">
<value>testsSource</value>
</property>
<!--数据库驱动-->
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="xaProperties">
<props>
<prop key="URL">${jdbc.url}</prop>
<prop key="user">root</prop>
<prop key="password">root</prop>
</props>
</property>
</bean>
<!-- tests2数据库 -->
<bean id="tests2Source" parent="abstractXADataSource">
<property name="uniqueResourceName">
<value>tests2Source</value>
</property>
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="xaProperties">
<props>
<prop key="URL">${jdbc.log.url}</prop>
<prop key="user">root</prop>
<prop key="password">root</prop>
</props>
</property>
</bean>
<!--分数的SessionFactoryBean-->
<bean id="sqlSessionFactoryBeanScore" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="typeAliasesPackage" value="cn.wfb.entity" />
<property name="dataSource" ref="testsSource"/>
</bean>
<!--钱的SessionFactoryBean-->
<bean id="sqlSessionFactoryBeanMoney" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="typeAliasesPackage" value="cn.wfb.entity" />
<property name="dataSource" ref="tests2Source"/>
</bean>
<!--4. 扫描dao的接口所在包,自动对该包下所有的dao接口自动生成代理对象-->
<!--包扫描,两个库,entity可以放一个包下,接口层的包最好分开,注入不同的SessionFactoryBean-->
<bean id="mapperScannerConfigurerScore" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="cn.wfb.dao.score" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryBeanScore" />
</bean>
<bean id="mapperScannerConfigurerMoney" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="cn.wfb.dao.money" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryBeanMoney" />
</bean>
<!--5. 事务管理器 -->
<!--JTA事务管理器-->
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager"/>
</property>
<property name="userTransaction">
<ref bean="atomikosLocalTransaction"/>
</property>
<property name="allowCustomIsolationLevels" value="true"/>
</bean>
<!--配置事务管理器atomikos事务管理器-->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown" value="false"/>
</bean>
<!-- 本地事务管理器-->
<bean id="atomikosLocalTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300000"/>
</bean>
<!--6. 开启事务控制的注解支持 -->
<tx:annotation-driven transaction-manager="springTransactionManager" proxy-target-class="true"/>
</beans>
ScoreServiceImpl中增加修改方法,Controller的就不写了,这个应该会了。不会的来问我吧
//直接加上注解,说明用的是springTransactionManager JTA事务管理器,可以看到抛了异常,两个不同的数据源回滚了
"springTransactionManager",rollbackFor = Exception.class) (value =
public void update(Integer id, Integer score) {
TScore tScore = scoreMapper.selectByPrimaryKey(id);
//花钱买分的话,先修改分数
tScore.setFraction(tScore.getFraction() + score);
scoreMapper.updateByPrimaryKeySelective(tScore);
//再修改钱,每修改一次,都新增一次作为记录
TMoney tMoney = new TMoney();
tMoney.setName(tScore.getName());
tMoney.setMoney(score);
moneyMapper.insert(tMoney);
int i = 1/0; //抛异常测试多数据源能否回滚
}
总结说一下的话,其实就是看Spring的配置文件,两份SqlSessionFactoryBean中配置两个数据源,然后通过JtaTransactionManager统一管,JtaTransactionManager管理着atomikosTransactionManager和atomikosLocalTransaction
2.LCN分布式事务框架适合分布式系统多微服务
TX-LCN定位于一款事务协调性框架,框架其本身并不操作事务,而是基于对事务的协调从而达到事务一致性的效果。
还是继续直接整代码
四个微服务,service_tm(管理事务微服务),service_eureka (注册中心),service_a(事务发起方)和service_b(参与方b)两个服务,演示a远程调用b,都向t_score表新增一条数据
父工程的pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.wfb</groupId>
<artifactId>lcn_demo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>service_a</module>
<module>service_b</module>
<module>service_tm</module>
<module>service_eureka</module>
</modules>
<properties>
<!-- project -setting -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<codingapi.txlcn.version>5.0.2.RELEASE</codingapi.txlcn.version>
<springcloud.version>Greenwich.RELEASE</springcloud.version>
<lombok.version>1.18.0</lombok.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--springCloud坐标-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${springcloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--以下三个坐标关于tx-lcn-->
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
<version>${codingapi.txlcn.version}</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
<version>${codingapi.txlcn.version}</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
<version>${codingapi.txlcn.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>app</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
一、搭建service_tm事务管理微服务
pom坐标
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lcn_demo</artifactId>
<groupId>cn.wfb</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>service_tm</artifactId>
<dependencies>
<!--事务管理微服务坐标-->
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
</dependency>
</dependencies>
</project>
application.properties
##################
# 这个是启动本服务的配置文件,其它的application-xxx.properties 是开发者的个性化配置,不用关心。
# 你可以在 https://txlcn.org/zh-cn/docs/setting/manager.html 看到所有的个性化配置
#################
#数据库相关配置,替换自己的信息 7970是后端控制服务的端口号
spring.application.name=TransactionManager
server.port=7970
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
spring.jpa.hibernate.ddl-auto=update
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.use-generated-keys=true
#txManager微服务需要Redis保存事务相关数据
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
#访问http://localhost:7970/admin/index.html#/login 的密码可以指定
tx-lcn.manager.admin-key=123456
TransactionManagerApplication入口类
package cn.wfb;
import com.codingapi.txlcn.tm.config.EnableTransactionManagerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
public class TransactionManagerApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionManagerApplication.class, args);
}
}
测试TxManager搭建是否ok可以通过访问其后台管理界面,这里可以看到被管理事务的微服务
TxManager系统后台:http://127.0.0.1:7970/admin/index.html#/login 密码 123456
二、service_eureka (注册中心)
pom坐标
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lcn_demo</artifactId>
<groupId>cn.wfb</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>service_eureka</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 7100
spring:
application:
name: txlcn-demo-registry
eureka:
client:
:
defaultZone: http://127.0.0.1:7100/eureka
false :
false :
DemoRegistryApp入口类
package cn.wfb;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
public class DemoRegistryApp {
public static void main(String[] args) {
SpringApplication.run(DemoRegistryApp.class, args);
}
}
测试注册中心访问7100可以看到后台管理界面即可
三、service_b(远程被调用方)
这里说明一下,entity,mapper,都用之前的SpringBoot整合文章里的素材就ok
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lcn_demo</artifactId>
<groupId>cn.wfb</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>service_b</artifactId>
<dependencies>
<!--知道什么是web项目吗?-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--jdbc没听说过?-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--驱动懂吗?-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- 直接就是plus 啥?不知道什么叫plus,iphonePlus不知道么!走走走 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8082 #设置端口号
spring: #数据库都设置不对,玩啥玩还?
datasource:
com.mysql.jdbc.Driver :
url: jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8
username: root
password: root
application:
name: spring-service-b
## eureka 配置
eureka:
instance:
127.0.0.1 :
${spring.application.name}:${server.port} :
true :
10 :
5 :
client:
5 :
:
defaultZone: http://127.0.0.1:7100/eureka
## tx-manager 配置
:
client:
127.0.0.1:8070 :
ribbon:
loadbalancer:
dtx:
enabled: true
ServiceBApplication入口类
package cn.wfb.demo;
import com.codingapi.txlcn.tc.config.EnableDistributedTransaction;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
//知道什么叫入口类吗,知道包名吗,用脑子想想,下面注解里包括什么注解
"cn.wfb.demo.dao") //这个干嘛用的?!!想想!想想!!行吗 (
public class ServiceBApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceBApplication.class, args);
}
}
ScoreController
package cn.wfb.demo.web;
import cn.wfb.demo.service.ScoreService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
//下面这四个注解看不明白?,
//拆解一下三个步骤:第一打开浏览器,第二www.baidu.com,第三回车!!!
public class ScoreController {
private ScoreService scoreService;
public String rpc( String value, HttpServletRequest servletRequest) {
return scoreService.rpc(value);
}
}
ScoreService
package cn.wfb.demo.service;
import cn.wfb.demo.dao.ScoreMapper;
import cn.wfb.demo.entity.TScore;
import com.codingapi.txlcn.tc.annotation.DTXPropagation;
import com.codingapi.txlcn.tc.annotation.TxcTransaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
public class ScoreService {
private ScoreMapper scoreMapper;
//TCC分布式事务注解
public String rpc(String value) {
TScore tScore = new TScore();
tScore.setName(value);
tScore.setSubject("远程调用后赋值的");
tScore.setFraction(value);
scoreMapper.insert(tScore);
//int i = 1/0; 测试远程调用serviceB出现问题,serviceA也会回滚
return "ok-service-b";
}
}
四、service_a(事务的发起方微服务)
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lcn_demo</artifactId>
<groupId>cn.wfb</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>service_a</artifactId>
<dependencies>
<!--知道什么是web项目吗?-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--jdbc没听说过?-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--驱动懂吗?-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- 直接就是plus 啥?不知道什么叫plus,iphonePlus不知道么!走走走 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8081 #设置端口号
spring: #数据库都设置不对,玩啥玩还?
datasource:
com.mysql.jdbc.Driver :
url: jdbc:mysql://localhost:3306/tests?characterEncoding=utf-8
username: root
password: root
application:
name: spring-service-a
## eureka 配置(这里不再做详细说明百度一下)
eureka:
instance:
127.0.0.1 :
${spring.application.name}:${server.port} :
true :
10 :
5 :
client:
5 :
:
defaultZone: http://127.0.0.1:7100/eureka
## tx-manager 配置
:
client:
127.0.0.1:8070 ## 8070是微服务和事务管理微服务沟通端口号 :
ribbon:
loadbalancer:
dtx:
enabled: true
ServiceAApplication入口类
package cn.wfb.demo;
import com.codingapi.txlcn.tc.config.EnableDistributedTransaction;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
//知道什么叫入口类吗,知道包名吗,用脑子想想,下面注解里包括什么注解
"cn.wfb.demo.dao") //这个干嘛用的?!!想想!想想!!行吗 (
public class ServiceAApplication {
public static void main(String[] args) {
SpringApplication.run(ServiceAApplication.class, args);
}
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
ScoreController
package cn.wfb.demo.web;
import cn.wfb.demo.entity.TScore;
import cn.wfb.demo.service.ScoreService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/score")
public class ScoreController {
@Autowired
private ScoreService scoreService;
@RequestMapping("/txlcn")
public String sayHello(@RequestParam("value") String value,
@RequestParam(value = "ex", required = false) String exFlag) {
return scoreService.execute(value, exFlag);
}
}
ScoreService
package cn.wfb.demo.service;
import cn.wfb.demo.dao.ScoreMapper;
import cn.wfb.demo.entity.TScore;
import com.codingapi.txlcn.tc.annotation.LcnTransaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.Objects;
public class ScoreService {
private ScoreMapper scoreMapper;
private RestTemplate restTemplate;
public String execute(String value, String exFlag) {
//这里通过restTemplate发起远程调用,调用服务B的rpc接口,返回字符串
//dResp = ok-service-b,服务b返回的字符串
String dResp = restTemplate.getForObject("http://127.0.0.1:8082/score/rpc?value=" + value, String.class);
//一切正常新增到数据库
TScore tScore = new TScore();
tScore.setName(value);
tScore.setSubject("测试lcn");
tScore.setFraction(value);
scoreMapper.insert(tScore);
// 置异常标志,DTX 回滚,测试的时候可以不传exFlag参数,默认会正常
if (Objects.nonNull(exFlag)) {
throw new IllegalStateException("by exFlag");
}
//正确测试,应该返回ok-service-b > ok-service-a字符串
return dResp + " > " + "ok-service-a";
}
}
测试如果一切正常调用接口后,会向数据库新增两条记录,一条是service-a新增,一条是service-b新增
测试带参数ex=false带上测试 会让service-a抛by exFlag异常,会发现远程微服务service-b也不会向数据库新增记录
总结一下
LCN是通过一个管理了事务组的微服务来控制多个微服务的事务
如果一切正常
如果参与方B出现业务问题,需要回滚
LCN分布式,就是通过TxManager微服务管理一个事务组,如果有A 远程调用B和C,那么通过A作为事务的发起方,B和C远程调用后,会返回信息给A如果正常,由发起方的A进行提交,如果在A有异常的话,不仅A有回滚,并且可以让B和C的事务也回滚。可以看出和Atomikos不同,LCN是可以进行分布式事务处理,因为可以多服务,Atomikos只能是多数据源,LCN不仅多数据源,并且多微服务
3.RocketMQ事务消息
继续拿转账的例子来说
系统解耦合从远程调用改成异步发消息,一个微服务操作Bob账户,一个微服务操作Smith账户
如果在操作Bob账户的时候,事务回滚了,但是异步消息发送出去了,后果可以想象吧???
在Spring中,我们要实现事务,一般通过@Transactional注解实现。这在引入RocketMQ之前没有问题,但是在引入了RocketMQ之后,如果消息发送之后的业务逻辑处理发生了异常的话,这时候消息已经发送出去了,就会导致业务的问题。
生产者向MQServer发送半消息【半消息:会存储进MQ Server,但是被标记为不能投递状态】
发送半消息成功,生产者实行本地事务
根据本地事务结果向MQ Server发送二次确认请求
MQ Server根据接受到的消息投递或者丢弃消息
若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态,MQ服务器将向同一组中的每个生产者发送检查消息,然后继续3,4的操作
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
改造上面的lcn_demo即可,去掉lcn相关的功能,因为是发送消息
发送方service-a
pom文件
<!--去掉lcn,和eureka相关坐标,整合rokectmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: productgroup
ScoreService主要的发送消息逻辑
package cn.wfb.demo.service;
import cn.wfb.demo.entity.TScore;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class ScoreService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public String execute(String value, String exFlag) {
TScore tScore = new TScore();
tScore.setName(value);
tScore.setSubject("测试lcn");
tScore.setFraction(value);
// 发送半消息
//第一个参数为txProducerGroup:就是group名称,根据业务自定义
//第二次信息为destination:topic名称
//第三个信息为message:消息体,利用MessageBuilder.withPayload构建
//第四个信息为arg:业务对象,用于处理本地业务
try {
//发送Half消息
rocketMQTemplate.sendMessageInTransaction(
"test-transactional",
"test-topic",
MessageBuilder.withPayload(
JSON.toJSONString(tScore).getBytes(RemotingHelper.DEFAULT_CHARSET)
).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build(),
null
);
} catch (Exception e) {
e.printStackTrace();
}
return "ok-service-a";
}
}
注意下面是发送方创建消息事务监听
package cn.wfb.demo.listener;
import cn.wfb.demo.dao.ScoreMapper;
import cn.wfb.demo.entity.TScore;
import cn.wfb.demo.service.ScoreService;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
public class TransactionalListener implements RocketMQLocalTransactionListener {
private ScoreMapper scoreMapper;
/**
* 消息发送方执行自身业务操作的方法
* @param message 发送方发送的东西
* @param o 额外的参数
* @return
*/
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
// 消息头
MessageHeaders headers = message.getHeaders();
String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("消息的id="+transactionalId);
// 消息内容
String jsonString = new String((byte[]) message.getPayload());
TScore tScore = JSON.parseObject(jsonString, TScore.class);
// arg:sendMessageInTransaction方法的第四个参数,用于处于本地业务
try {
//int i = 1/0;
// 本地业务代码,保存数据库
// 业务逻辑如果出现异常,之前发送的消息会ROLLBACK
scoreMapper.insert(tScore);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("=发消息出异常=");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态
* MQ Server将向同一组中的每个生产者发送检查消息
* 事务超时,回查方法
* @param message:携带要回查的事务ID
*/
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
try {
// 检查
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
其实消息接收方也就是service-b正常处理消息就行了,关键还是看上面发送方的service-a的TransactionalListener类处理
比如增加个MessageConsumer监听,收到消息处理逻辑即可
package cn.wfb.demo.listener;
import cn.wfb.demo.entity.TScore;
import cn.wfb.demo.service.ScoreService;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
public class MessageConsumer implements RocketMQListener<String> {
private ScoreService scoreService;
public void onMessage(String message) {
System.out.println("接收到消息:="+message);
//将Service-a发送来的消息转对象
TScore tScore = JSON.parseObject(message, TScore.class);
//保存数据库
scoreService.rpc(tScore.getName());
}
}
关于分布式事务,也就总结到这里了,好好看看,主要是概念,领会一下!!!
以上是关于分布式事务压缩储备粮的主要内容,如果未能解决你的问题,请参考以下文章