Rabbitmq的分布式事务解决方案

Posted 沉泽·

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq的分布式事务解决方案相关的知识,希望对你有一定的参考价值。

一、前言

1.1 简述

分布式事务指事务的操作位于不同的节点上,需要保证事务的 AICD 特性。
例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。

1.2 分布式事务的几种解决方案

1.2.1 2PC(两阶段提交)

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。
在计算机中部分关系数据库如Oracle、mysql支持两阶段提交协议,如下图:

  1. 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事 务,并写本地的Undo/Redo日志,此时事务没有提交。 (Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数 据文件)
  2. 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者 发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操 作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。

这里不重点介绍

1.2.2 XA方案

2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对 接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义了分布式事务处理模型 DTP(Distributed Transaction Processing Reference Model)。

1)在准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;
2)在提交阶段TM会接受RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操 作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放。

XA方案的问题:
1、需要本地数据库支持XA协议。
2、资源锁需要等到两个阶段结束才释放,性能较差。

这里不重点介绍

1.2.3 Seata方案

Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。 传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作 在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服 务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

这里不重点介绍

可以参考seata实现2PC事务:https://blog.csdn.net/weixin_41979002/article/details/120198259

1.2.4 TCC方案

TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认 Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的 操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所 有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel 操作若执行失败,TM会进行重试。

这里不重点介绍

1.2.5 本地消息表方案

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能 够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

此方式通过mq中间件实现,也是本文重点介绍方式

1.2.6 RocketMQ事务消息方案

这里不做过多介绍,这里讲的是rabbitmq;但两者实现方式稍有略同。

1.2.7 最大努力通知

你们百度吧。

1.3 小结

上述介绍了多个解决方案,没有最好的只有最合适的,在不同的业务场景,可以选择不同的实现方案。视情况而定。

二、rabbitmq本地消息表方案

2.1 方案流程介绍

本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后 通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

交互流程如下:
1、用户注册
用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致) 下边是伪代码

begin transaction; 
	//1.新增用户 
	//2.存储积分消息日志 
commit transation;

这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原 子性。
2、定时任务扫描日志
如何保证将消息发送给消息队列呢?

经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息 中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。

3、消费消息

如何保证消费者一定能消费到消息呢?

这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,如果消费者接收到消息并且业务处理完成后向MQ 发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息,否则消费者会不断重 试向消费者来发送消息。

积分服务接收到”增加积分“消息,开始增加积分,积分增加成功后向消息中间件回应ack,否则消息中间件将重复 投递此消息。

由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。

2.2 生产者端的发送可靠保证

2.2.0 创建生产者数据库表


一个是主要业务数据表,一个是事务日志记录表。

2.2.1 创建项目工程

2.2.2 添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>dtx-demo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>dtx-txmsg-demo-bank1</artifactId>
    <dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--springboot+springcloud基本依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

        <!--日志打印selfj-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--mybatis+mysql+数据源连接依赖-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
</project>

2.2.3 application.yml

server:
  port: 8001
spring:
  datasource:
    #当前数据源操作类型
    type: com.alibaba.druid.pool.DruidDataSource
    #mysql驱动包
    driver-class-name: org.gjt.mm.mysql.Driver
    url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf8&useSSL=false
    username: root
    password: 123456

  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    addresses: 192.168.229.128:5672
    publisher-confirm-type: correlated
    publisher-returns: true

2.2.4 创建交换机和队列

package cn.itcast.dtx.txmsgdemo.bank1.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {

    //创建队列
    @Bean
    public Queue directEmailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("account.queue", true);
    }

    //创建交换机
    @Bean
    public DirectExchange directOrderExchange() {
        return new DirectExchange("account_exchange", true, false);
    }

    //绑定关系
    @Bean
    public Binding directEmailBinding() {
        return BindingBuilder.bind(directEmailQueue()).to(directOrderExchange()).with("account");
    }
}

2.2.5 dao

package cn.itcast.dtx.txmsgdemo.bank1.dao;

import cn.itcast.dtx.txmsgdemo.bank1.entity.AccountInfo;
import cn.itcast.dtx.txmsgdemo.bank1.entity.AccountInfoLog;
import cn.itcast.dtx.txmsgdemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank1.schedule.ScanAccountLog;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Component;

import java.util.List;

@Mapper
@Component
public interface AccountInfoDao {
    @Update("update account_info set account_balance=account_balance-#{amount}  where account_no=#{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    @Select("select count(1) from de_duplication where tx_no = #{txNo}")
    int isExistTx(String txNo);


    @Insert("insert into de_duplication values(#{txNo},now());")
    int addTx(String txNo);

    @Insert("insert into account_info_log values(null,#{accountNo},#{amount},#{txNo},'Y','2');")
    int addAccountLog(@Param("accountNo") String accountNo, @Param("amount") Double amount,@Param("txNo") String txNo);

    @Update("update account_info_log set status = #{status} where txNo = #{txNo};")
    int updAccountLog(@Param("txNo") String txNo,@Param("status") String status);

    @Select("select * from account_info_log where status = 'N'")
    List<AccountInfoLog> selectFailAcount();
}

2.2.6 消息发送确认方法

package cn.itcast.dtx.txmsgdemo.bank1.config;

import cn.itcast.dtx.txmsgdemo.bank1.dao.AccountInfoDao;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    AccountInfoDao accountInfoDao;
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息标识:" + correlationData.toString());
        log.info("发送成功确认:"+ack);
        log.info("错误原因:"+cause);

        //该方法在消息到达MQ服务器时都表示成功,并不能保证消息一定会被投递到目标 queue 里,所以这个方法不适合做确认
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息:"+message.toString());
        log.info("返回码:"+replyCode);
        log.info("返回描述:"+replyText);
        log.info("交换机:"+exchange);
        log.info("路由key:"+routingKey);

        //方法执行,说明消息未投递成功,将日志表的状态修改为N
        String msg = new String(message.getBody());
        accountInfoDao.updAccountLog((String) JSONObject.parseObject(msg).get("txNo"),"N");
    }
}

2.2.7 核心业务方法

package cn.itcast.dtx.txmsgdemo.bank1.service.impl;

import cn.itcast.dtx.txmsgdemo.bank1.config.ConfirmService;
import cn.itcast.dtx.txmsgdemo.bank1.dao.AccountInfoDao;
import cn.itcast.dtx.txmsgdemo.bank1.model.AccountChangeEvent;
import cn.itcast.dtx.txmsgdemo.bank1.service.AccountInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

/**
 * @author Administrator
 * @version 1.0
 **/
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmService confirmService;

    @Override
    @Transactional
    public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) throws Exception {

        //本地扣减金额
        int i = accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount());
        if (i != 1){
            throw new Exception("更新失败");
        }

        //记录本地事务日志表
        int j = accountInfoDao.addAccountLog(accountChangeEvent.getAccountNo(), accountChangeEvent.getAmount(),accountChangeEvent.getTxNo());
        if (j != 1){
            throw new Exception<

以上是关于Rabbitmq的分布式事务解决方案的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ解决分布式事务

一个基于 RabbitMQ 的可复用的分布式事务消息架构方案!

一个基于RabbitMQ的可复用的事务消息方案

RabbitMQ学习记录1

RabbitMQ,RocketMQ,Kafka 事务性,消息丢失和重复发送处理策略

RabbitMq解决分布式事物