调用dubbo服务时事务配置在哪

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了调用dubbo服务时事务配置在哪相关的知识,希望对你有一定的参考价值。

参考技术A dubbo实现了分布式远程调用框架,多运行节点既能提高可靠性,又能提升负载能力。dubbo配置主要有注册中心(推荐zookeeper或redis)、提供者provider、消费者consumer,注册中心是第三方实现,所以主要配置好服务提供者和消费者就可以了。实际上服务接口和实现都是需要我们自己设计和实现的,dubbo做的事情就是将服务实现发布到注册中心,然后消费者从注册中心订阅服务接口,之后对接口的调用就由dubbo调度提供者去执行并返回结果。以下配置都有源码,见右侧“免费资源”。

提供者provider的配置:提供者是独立运行的节点,可以多实例运行,将服务注册到注册中心
必须要有application name,注册中心配置zookeeper,协议dubbo,超时6秒失败不重试,提供者加载repository和service层bean,然后发布接口service。
<dubbo:application name="ite-provider" />
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:provider timeout="6000" retries="0"/>

<import resource="classpath:cache.xml"/>
<import resource="classpath:ite-repository.xml"/>
<import resource="classpath:ite-service.xml"/>
<import resource="classpath:ite-provider.xml"/>
ite-provider.xml,ref引用的bean是ite-service.xml已经定义好的接口实现,dubbo:service就是把接口实现发布到注册中心
<dubbo:service ref="codeListService" interface="com.itecheast.ite.domain.service.CodeListService" />
<dubbo:service ref="idService" interface="com.itecheast.ite.domain.service.IdService" />
<dubbo:service ref="passwordService" interface="com.itecheast.ite.domain.service.PasswordService" />
<dubbo:service ref="rolePermissionService" interface="com.itecheast.ite.domain.service.RolePermissionService" />
provider是可以独立运行的,dubbo.jar里面有assembly目录,运行mvn assembly:directory就可以生成能直接运行的provider目录

assembly.xml内容,可以切换dir或tar.gz两种格式
<assembly>
<id>assembly</id>
<formats>
<!-- <format>tar.gz</format> -->
<format>dir</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/assembly/bin</directory>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>src/main/assembly/conf</directory>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>src/test/resources</directory>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
dubbo.properties,运行start.bat或start.sh时,将从属性文件读取dubbo配置信息,provider节点可以多处复制并运行。
dubbo.container=log4j,spring
dubbo.application.name=ite-provider
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.monitor.protocol=registry
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
dubbo.spring.config=provider.xml
dubbo.log4j.file=logs/ite-provider.log
dubbo.log4j.level=WARN
消费者consumer的配置,使用dubbo:reference订阅注册中心里的服务即可,然后就可以@Autowired注入服务接口了。
<dubbo:application name="ite-consumer" />
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>

<dubbo:reference id="codeListService" interface="com.itecheast.ite.domain.service.CodeListService" />
<dubbo:reference id="idService" interface="com.itecheast.ite.domain.service.IdService" />
<dubbo:reference id="passwordService" interface="com.itecheast.ite.domain.service.PasswordService" />
<dubbo:reference id="rolePermissionService" interface="com.itecheast.ite.domain.service.RolePermissionService" />
如果前端项目是一个消费者,就可以在web.xml里直接加载consumer.xml订阅服务了。
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:consumer.xml,classpath:cache.xml,classpath:shiro.xml,classpath:front.xml</param-value>
</context-param>

实际上本地调试开发时,可以不必启用分布式配置,只需要更改web.xml即可,所有的服务都已经是配置好了的。
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:ite-repository.xml,classpath:ite-service.xml,classpath:cache.xml,classpath:shiro.xml,classpath:front.xml</param-value>
</context-param>
zookeeper的配置很简单,
wget http://tool.xlongwei.com/softwares/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
cd zookeeper-3.4.6/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg #配置zookeeper参数
单机配置(集群配置待研究)

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/dubbo/zookeeper-3.3.3/data
clientPort=2181
运行或停止zookeeper
sh zkServer.sh start | stop

分布式事务 - AT模式Dubbo集成Seata

本篇基于Dubbo集成Seata实现一个分布式事务的解决方案,在整个业务流程中,会涉及如下三个服务:

  • 订单服务:用于创建订单。

  • 账户服务:从账户中扣减余额。

  • 库存服务:扣减指定商品的库存数量。


下图是这三个微服务的整体架构图,用户执行下单请求时,会调用下单业务的REST接口,该接口会分别调用库存服务以及订单服务。另外,订单服务还会调用账户服务先进行资金冻结,整个流程涉及这三个服务的分布式事务问题。


项目准备

基于Spring Boot + Nacos + Dubbo构建项目,包含下面这些服务:

  • sample-order-service,订单服务

  • sample-repo-service,库存服务

  • sample-account-service,账户服务

  • sample-seata-common,公共服务组件

  • sample-rest-web,提供统一业务的REST接口 服务



其中sample-order-service、sample-repo-service、sample-account-service是基于Spring Boot + Dubbo构建的微服务,sample-rest-web提供统一的业务服务入口,sample-seata-common提供公共组件。

数据库准备

创建三个数据库:seata_order、seata_repo、seata_account,并分别在这三个数据库中创建对应的业务表。

--对应seata_order数据库CREATE TABLE 'tbl_order' ( 'id' int(11) NOT NULL AUTO_INCREMENT, 'order_no' varchar(255) DEFAULT NULL, 'user_id' varchar(255) DEFAULT NULL, 'product_code' varchar(255) DEFAULT NULL, 'count' int(11) DEFAULT 0, 'amount' int(11) DEFAULT 0, PRIMARY KEY ('id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;
--对应seata_repo数据库CREATE TABLE 'tbl_repo' ( 'id' int(11) NOT NULL AUTO_INCREMENT, 'product_code' varchar(255) DEFAULT NULL, 'name' varchar(255) DEFAULT NULL, 'count' int(11) DEFAULT 0, PRIMARY KEY ('id'), UNIQUE KEY ('product_code')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;
-- 初始数据INSERT INTO 'tbl_repo' VALUES (1, 'TEST20200606001', '键盘', '1000');INSERT INTO 'tbl_repo' VALUES (2, 'TEST20200606002', '鼠标', '100');
-- 对应seata_account数据库CREATE TABLE 'tbl_account' ( 'id' int(11) NOT NULL AUTO_INCREMENT, 'user_id' varchar(255) DEFAULT NULL, 'balance' int(11) DEFAULT 0, PRIMARY KEY ('id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;--初始数据INSERT INTO 'tbl_account' VALUES (1, '1001', '10000')


核心方法说明

下面介绍部分主要代码:
sample-account-service:账户服务提供余额扣减的功能,具体代码如下:

@Slf4j@Servicepublic class AccountServiceImpl implements IAccountService{
@Autowired AccountMapper accountMapper;
@Override public ObjectResponse decreaseAccount(AccountDto accountDto) { ObjectResponse response=new ObjectResponse(); try{ int rs=accountMapper.decreaseAccount(accountDto.getUserId(),accountDto.getBalance().doubleValue()); if(rs>0){ response.setMsg(ResCode.SUCCESS.getMessage()); response.setCode(ResCode.SUCCESS.getCode()); return response; } response.setMsg(ResCode.FAILED.getMessage()); response.setCode(ResCode.FAILED.getCode()); }catch (Exception e){ log.error("decreaseAccount Occur Exception:"+e); response.setCode(ResCode.SYSTEM_EXCEPTION.getCode()); response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage()); } return response; }}


sample-order-service:订单服务负责创建订单,并且在创建订单之前先基于Dubbo协议调用账户服务的资金扣减接口。

@Slf4j@Servicepublic class OrderServiceImpl implements IOrderService{
@Autowired OrderMapper orderMapper; @Autowired OrderConvert orderConvert; @Reference IAccountService accountService; @Override public ObjectResponse<OrderDto> createOrder(OrderDto orderDto) { log.info("全局事务ID:"+ RootContext.getXID()); ObjectResponse response=new ObjectResponse(); try { //账户扣款 AccountDto accountDto = new AccountDto(); accountDto.setUserId(orderDto.getUserId()); accountDto.setBalance(orderDto.getOrderAmount()); ObjectResponse accountRes = accountService.decreaseAccount(accountDto); //创建订单 Order order=orderConvert.dto2Order(orderDto); order.setOrderNo(UUID.randomUUID().toString()); orderMapper.createOrder(order); //判断扣款状态(判断可以前置) if(accountRes.getCode()!=ResCode.SUCCESS.getCode()){ response.setMsg(ResCode.FAILED.getMessage()); response.setCode(ResCode.FAILED.getCode()); return response; } response.setMsg(ResCode.SUCCESS.getMessage()); response.setCode(ResCode.SUCCESS.getCode()); }catch (Exception e){ log.error("createOrder Occur Exception:"+e); response.setCode(ResCode.SYSTEM_EXCEPTION.getCode()); response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage()); } return response; }}


sample-repo-service:库存服务提供库存扣减功能:

@Slf4j@Servicepublic class RepoServiceImpl implements IRepoService{ @Autowired RepoMapper repoMapper;
@Override public ObjectResponse decreaseRepo(ProductDto productDto) { ObjectResponse response=new ObjectResponse(); try { int repo = repoMapper.decreaseRepo(productDto.getProductCode(), productDto.getCount()); if(repo>0){ response.setMsg(ResCode.SUCCESS.getMessage()); response.setCode(ResCode.SUCCESS.getCode()); return response; } response.setMsg(ResCode.FAILED.getMessage()); response.setCode(ResCode.FAILED.getCode()); }catch (Exception e){ log.error("decreaseRepo Occur Exception:"+e); response.setCode(ResCode.SYSTEM_EXCEPTION.getCode()); response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage()); } return response; }}


sample-rest-web: 基于Spring Boot的web项目,主要用于对外提供以业务为维度的REST接口,会分别调用库存服务和订单服务,实现库存扣减及创建订单的功能。

@Slf4j@RestControllerpublic class OrderController {
@Autowired IRestOrderService restOrderService;
@PostMapping("/order") ObjectResponse order(@RequestBody OrderRequest orderRequest) throws Exception { return restOrderService.handleBusiness(orderRequest); }}


RestOrderServiceImpl的具体实现如下:

@Slf4j@Servicepublic class RestOrderServiceImpl implements IRestOrderService { @Reference IRepoService repoService; @Reference IOrderService orderService;
@Override @GlobalTransactional(timeoutMills = 300000, name = "sample-rest-web") public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception { log.info("开始全局事务:xid="+ RootContext.getXID()); log.info("begin order: "+orderRequest); //1. 扣减库存 ProductDto productDto=new ProductDto(); productDto.setProductCode(orderRequest.getProductCode()); productDto.setCount(orderRequest.getCount()); ObjectResponse repoRes=repoService.decreaseRepo(productDto); //2. 创建订单 OrderDto orderDto=new OrderDto(); orderDto.setUserId(orderRequest.getUserId()); orderDto.setOrderAmount(orderRequest.getAmount()); orderDto.setOrderCount(orderRequest.getCount()); orderDto.setProductCode(orderRequest.getProductCode()); ObjectResponse orderRes=orderService.createOrder(orderDto); if(orderRequest.getProductCode().equals("GP20200202002")){ throw new Exception("系统异常"); } ObjectResponse response=new ObjectResponse(); response.setMsg(ResCode.SUCCESS.getMessage()); response.setCode(ResCode.SUCCESS.getCode()); response.setData(orderRes.getData()); return response; }}


项目启动顺序及访问

这几个项目彼此之间存在依赖关系,项目的启动顺序为:

  • sample-seata-common为公共组件,需要先通过mvn install到本地仓库给其他服务依赖。

  • 启动sample-account-service,它会被订单服务调用。

  • 启动订单服务sample-order-service。

  • 启动库存服务sample-repo-service。

  • 启动sample-rest-web,它作为REST的业务入口。



整合Seata实现分布式事务

在上述流程中,加入库存扣减成功了,但是在创建订单的时候,入股由于账户资金不足导致失败,就会出现数据不一致的场景。按照正常的流程来说,被扣减的库存需要加回去,这就是一个分布式事务的场景。接下来我们在项目中整合Seata来解决该问题。

添加Seata Jar包依赖
分别在4个项目中添加Seata的starter组件依赖:

<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.0.0</version></dependency>

添加Seata配置项目
同样分别在4个项目中的application.yml文件中添加Seata的配置项:

seata: enabled: true tx-service-group: sample-rest-web transport: type: TCP server: NIO heartbeat: true #client和server通信心跳检测开关(默认为true) enable-client-batch-send-request: true thread-factory: boss-thread-prefix: NettyBoss worker-thread-prefix: NettyServerNIOWorker server-executor-thread-prefix: NettyServerBizHandler share-boss-worker: false client-selector-thread-prefix: NettyClientSelector client-selector-thread-size: 1 client-worker-thread-prefix: NettyClientWorkerThread boss-thread-size: 1 worker-thread-size: 8 shutdown: wait: 3 serialization: seata #client和server通信编解码方式 compressor: none service: vgroup-mapping: default #TC集群,需要和Seata-Server保持一致 enable-degrade: false #降级开关,默认为false,业务根据连续错误数自动降级,不走Seata事务 disable-global-transaction: false #全局事务开关,默认为false,false为开启,true为关闭 #grouplist: 192.168.216.128:8091 #TC服务列表,也就是Seata服务端地址,只有当注册中心为file时使用  client: rm: lock: lock-retry-interval: 10 lock-retry-policy-branch-rollback-on-conflict: true lock-retry-times: 30 rm-async-commit-buffer-limit: 10000 rm-report-retry-count: 5 rm-table-meta-check-enable: false rm-report-success-enable: true tm-commit-retry-count: 5 tm-rollback-retry-count: 5 undo: undo-log-table: undo_log undo-data-validation: true undo-log-serialization: jackson log: exception-rate: 100 support: spring: datasource-autoproxy: false registry: type: nacos nacos: cluster: default server-addr: 192.168.216.128:8848


上述配置中有几个配置项需要注意:

  • seata.support.spring.datasource-autoproxy: true 属性表示数据源自动代理开关,在sample-order-service、sample-account-service、sample-repo-service中设置为true,在sample-rest-web中设置为false,因为该项目并没有访问数据源,不需要代理。

seata: registry: type: nacos nacos: cluster: default server-addr: 192.168.216.127:8848
  • tx-service-group表示指定服务所属的事务分组,如果没有指定,默认使用spring.application.name加上字符串-seata-service-group。需要注意这两项配置必须要配置一项,否则会报错。


添加回滚日志表
分别在3个数据库seata-account、seata-repo、seata-order中添加一张回滚日志表,用于记录每个数据库表操作的回滚日志,当某个服务的事务出现异常时会根据该日志进行回滚。

CREATE TABLE 'undo_log' ( 'id' bigint(20) NOT NULL AUTO_INCREMENT, 'branch_id' bigint(20) NOT NULL, 'xid' varchar(100) NOT NULL, 'context' varchar(128) NOT NULL, 'rollback_info' longblob NOT NULL, 'log_status' int(11) NOT NULL, 'log_created' datetime NOT NULL, 'log_modified' datetime NOT NULL, PRIMARY KEY ('id'), UNIQUE KEY 'ux_undo_log' ('xid', 'branch_id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;


sample-rest-web增加全局事务控制
修改sample-rest-web工程的RestOrderServiceImpl,做两件事情:

  • 增加@GlobalTransactional全局事务注解

  • 模拟一个异常处理,当商品编号等于某个指定的值时抛出异常,触发整个事务的回滚。


@Slf4j@Servicepublic class RestOrderServiceImpl implements IRestOrderService { @Reference IRepoService repoService; @Reference IOrderService orderService;
@Override @GlobalTransactional(timeoutMills = 300000, name = "sample-rest-web") public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception { log.info("开始全局事务:xid="+ RootContext.getXID()); log.info("begin order: "+orderRequest); //1. 扣减库存 ProductDto productDto=new ProductDto(); productDto.setProductCode(orderRequest.getProductCode()); productDto.setCount(orderRequest.getCount()); ObjectResponse repoRes=repoService.decreaseRepo(productDto); //2. 创建订单 OrderDto orderDto=new OrderDto(); orderDto.setUserId(orderRequest.getUserId()); orderDto.setOrderAmount(orderRequest.getAmount()); orderDto.setOrderCount(orderRequest.getCount()); orderDto.setProductCode(orderRequest.getProductCode()); ObjectResponse orderRes=orderService.createOrder(orderDto); if(orderRequest.getProductCode().equals("GP20200202002")){ throw new Exception("系统异常"); } ObjectResponse response=new ObjectResponse(); response.setMsg(ResCode.SUCCESS.getMessage()); response.setCode(ResCode.SUCCESS.getCode()); response.setData(orderRes.getData()); return response; }}



以上是关于调用dubbo服务时事务配置在哪的主要内容,如果未能解决你的问题,请参考以下文章

dubbo(x)分布式事务解决方案

dubbo mock配置

怎么用注解的方式发布dubbo服务

dubbo

Dubbo配置参数详解-generic

dubbo服务降级