RocketMQ可靠消息最终一致性解决方案

Posted 蕃薯耀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ可靠消息最终一致性解决方案相关的知识,希望对你有一定的参考价值。

RocketMQ可靠消息最终一致性解决方案 - 用户消费赚积分业务

 

================================

©Copyright 蕃薯耀 2021-05-14

https://www.cnblogs.com/fanshuyao/

 

什么是可靠消息最终一致性方案?

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后发出一条消息到消息中间件,事务参与方(消息消费者)一定能够接收到消息并处理事务成功,此方案强调的是只要消息发给事务参与方,则最终事务要达到一致。

具体流程图如下所示:

 

 

 

消息事务一致性问题:

 

本地消息成功,消息超时(但发送成功),本地事务回滚,消息成功。
造成本地事务,与消息参与方的事务不一致。

 

 

一、数据库设计(Mysql)

#RocketMQ可靠消息最终一致性解决方案 - 用户消费赚积分业务


#用户表
#drop table `user`;
CREATE TABLE `user`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_name VARCHAR(100) NOT NULL,
    money DOUBLE NOT NULL DEFAULT 100,#用户金额
    score_sum DOUBLE NOT NULL DEFAULT 0,#总积分
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);
INSERT INTO `user`(user_name) VALUES (\'小明\');


#消费记录表
#DROP TABLE `money`;
CREATE TABLE `money`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    money_code BIGINT NOT NULL UNIQUE,#唯一标识,流水号
    user_id BIGINT NOT NULL,#启用id
    money DOUBLE NOT NULL DEFAULT 0,#消费的金额
    msg_id VARCHAR(50),#回写消息的id
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);




#积分记录表
#DROP TABLE `score`;
CREATE TABLE `score`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    money_code BIGINT NOT NULL UNIQUE,
    score_add DOUBLE,
    score_type VARCHAR(30) NOT NULL DEFAULT \'jifen\',
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);

SELECT * FROM `mq`.`user`;

SELECT * FROM `mq`.`score`;

#delete from `mq`.`score`;

SELECT * FROM `mq`.`money`;

#DELETE FROM `mq`.`money`;

 

 

 

 

二、RocketMQ消息生产者端

 

1、pom.xml引入依赖

<properties>    
    <rocketmq-spring-boot.version>2.2.0</rocketmq-spring-boot.version>
</properties>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>${rocketmq-spring-boot.version}</version>
</dependency>

 

2、application.properties配置

#增加RocketMq依赖后,增加配置
rocketmq.name-server=localhost:9876
rocketmq.producer.group=rocketMQ2Producer
rocketmq.producer.sendMessageTimeout=30000

#自定义变量
my.mq.tag.score=tag-score

 

3、Controller端

import javax.servlet.http.HttpServletRequest;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lqy.mq.bean.Result;
import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.service.UserService;
import com.lqy.utils.JsonUtil;
import com.lqy.utils.UidUtil;

/**
 * <p>
 *  前端控制器
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@RestController
@RequestMapping("/user")
public class UserController {
    
    private static Logger log = Logger.getLogger(UserController.class);
    
    @Autowired
    private UserService userService;
    
    @RequestMapping("/consumeMoney")
    public Result consumeMoney(HttpServletRequest request, Money m) {
        
        log.info("m = " + JsonUtil.toJson(m));
        m.setMoneyCode(UidUtil.getUid());
        log.info("m2 = " + JsonUtil.toJson(m));
        
        userService.sendMsg(m);
        
        return Result.ok();
    }
}

 

4、Service端

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lqy.mq.biz.producer.dao.UserDao;
import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.entity.User;
import com.lqy.mq.biz.producer.service.MoneyService;
import com.lqy.mq.biz.producer.service.UserService;
import com.lqy.utils.JsonUtil;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@Service
public class UserServiceImpl extends ServiceImpl<UserDao, User> implements UserService {

    private static Logger log = Logger.getLogger(UserServiceImpl.class);
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private MoneyService moneyService;
    
    @Value("${my.mq.tag.score}")
    private String TAG_SCORE;
    
    
    @Transactional
    @Override
    public void consume(Money m) {
        User user = this.baseMapper.selectById(m.getUserId());
        
        user.setMoney(user.getMoney() - m.getMoney());
        
        this.baseMapper.updateById(user);
        
        moneyService.save(m);
        log.info("m = " +JsonUtil.toJson(m));
        
        if(m.getMoney() == 99) {
            throw new RuntimeException("消费生产者异常");
        }
        
    }
    
    
    @Transactional
    @Override
    public void sendMsg(Money m) {
        
        //先进行消费
        this.consume(m);
        
        
        try {
            
            //后发送消息
            //如果因为消息发生超时,导致业务的事务回滚,后面可以在事务消息中确认或者回查时,根据有没有业务信息进行消息的提交或者回滚
            
            //消息对象生成
            Message<String> message = MessageBuilder.withPayload(JsonUtil.toJson(m))
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, m.getMoneyCode())
                    .setHeader(RocketMQHeaders.TOPIC, TAG_SCORE)
                    .build();
            
            //事务发消息
            TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(TAG_SCORE, message, null);
            
            if(transactionSendResult == null || transactionSendResult.getSendStatus() != SendStatus.SEND_OK) {
                throw new RuntimeException("MQ消息发送失败");
            }
            
            log.info("transactionSendResult = " +JsonUtil.toJson(transactionSendResult));
            log.info("msgId = " +transactionSendResult.getMsgId());
            
            //回写msgId
            m.setMsgId(transactionSendResult.getMsgId());
            moneyService.updateById(m);
            
        }catch (Exception e) {
            throw new RuntimeException("MQ消息发送失败");
        }
        
        
        //故意造错:这里会导致RocketMq消息发送成功,但业务数据已经回滚。
        //一般情况,消息发送后,不应该再出现业务的代码
        if(m.getMoney() == 7) {
            throw new RuntimeException("消息生产者异常,金额不对");
        }
    }
    
    
    
    
}

 

 

5、RocketMQLocalTransactionListener监听器,RocketMQ发送消息成功确认和RocketMQ事务消息回查

import org.apache.log4j.Logger;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.service.MoneyService;
import com.lqy.utils.JsonUtil;


@Component
@RocketMQTransactionListener
public class MyRocketMqListener implements RocketMQLocalTransactionListener {

    private static Logger log = Logger.getLogger(MyRocketMqListener.class);
    
    @Value("${my.mq.tag.score}")
    private String TAG_SCORE;
    
    @Autowired
    private MoneyService moneyService;
    
    
    private RocketMQLocalTransactionState dealMsg(Message msg, Object arg) {
        String json = new String((byte[])msg.getPayload());
        log.info("json = " + json);
        
        MessageHeaders messageHeaders = msg.getHeaders();
        if(messageHeaders != null) {
            String tag = messageHeaders.get(RocketMQHeaders.TOPIC, String.class);
            log.info("tag = " + tag);
            log.info("TAG_SCORE = " + TAG_SCORE);
            
            if(TAG_SCORE.equals(tag)) {
                Money m = JsonUtil.toBean(json, Money.class);
                Long moneyCode = m.getMoneyCode();
                
                log.info("moneyCode = " + moneyCode);
                
                //需要进行幂等校验(因为RocketMq会不断回查,即多次回查)
                if(moneyService.exist(moneyCode)) {
                    return RocketMQLocalTransactionState.COMMIT;
                    
                }else {
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
        }
        
        return RocketMQLocalTransactionState.UNKNOWN;
    }
    
    
    //rocketmq保存信息成功后回调
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=====executeLocalTransaction=====");
        return dealMsg(msg, arg);
    }

    
    //rocketmq事务回查信息状态
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("+++++checkLocalTransaction+++++");
        return dealMsg(msg, null);
    }

}

 

 

三、RocketMQ 消息消费者端

 

1、消费者pom.xml引入依赖

<properties>
    <rocketmq-spring-boot.version>2.2.0</rocketmq-spring-boot.version>
</properties>
    
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring-boot.version}</version>
</dependency>

 

2、消费者application.properties配置

#增加RocketMq依赖后,增加配置
rocketmq.name-server=localhost:9876
rocketmq.consumer.group=rocketMQ2Consumer

#自定义变量
my.mq.tag.score=tag-score

 

3、消费者端RocketMQ监听器:RocketMQListener

import org.apache.log4j.Logger;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.lqy.mq.biz.consumer.entity.Money;
import com.lqy.mq.biz.consumer.service.ScoreService;
import com.lqy.utils.JsonUtil;

@Component
@RocketMQMessageListener(consumerGroup = "rocketMQ2ConsumerListener", topic = "${my.mq.tag.score}")
//如果生产者有TAG(格式为:topic:mytag),则需要使用下面的
//@RocketMQMessageListener(consumerGroup = "rocketMQ2ConsumerListener", topic = "${my.mq.tag.score}", selectorType = SelectorType.TAG, selectorExpression = "")
public class MyConsumerListenner implements RocketMQListener<String> {

    private static Logger log = Logger.getLogger(MyConsumerListenner.class);
    
    @Autowired
    private ScoreService scoreService;
    

    @Override
    public void onMessage(String message) {
        log.info("message = " + message);
        
        Money money = JsonUtil.toBean(message, Money.class);
        
        scoreService.saveScore(money);
    }

}

 

 

4、消费者Service端

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lqy.mq.biz.consumer.dao.ScoreDao;
import com.lqy.mq.biz.consumer.entity.Money;
import com.lqy.mq.biz.consumer.entity.Score;
import com.lqy.mq.biz.consumer.service.MoneyService;
import com.lqy.mq.biz.consumer.service.ScoreService;
import com.lqy.mq.biz.consumer.service.UserService;
import com.lqy.utils.JsonUtil;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@Service
public class ScoreServiceImpl extends ServiceImpl<ScoreDao, Score> implements ScoreService {
    
    private static Logger log = Logger.getLogger(ScoreServiceImpl.class);
    
    @Autowired
    private MoneyService moneyService;
    
    @Autowired
    private UserService userService;

    @Transactional
    @Override
    public Score saveScore(Money money) {
        
        //没有钱的记录,跳过(可能是因为业务回滚,但消息已经发生成功(消息超时等原因))
        //注意:但这个判断会存在问题,当消息比较快,而在消息生产端生产的数据库插入数据比较慢时,会导致消费者端未找到数据而走了这步。
        //所以这步是要抛异常的,因为这里没有成功消费,RocketMq会重新摄像头摄像头推送
        if(!moneyService.exist(money.getMoneyCode())) {
            
            //当产生了RocketMq消息,但业务回滚时,此处会一直报错
            //这里可以将没有钱的记录,保存在另一个表,方便业务进行核查。如果是消息生产者系统导致的问题,可以根据实际业务补偿处理。
            //每次根据唯一键查询,记录失败的次数,在N次失败以后,可以通过RocketMQ后台处理该消息,避免一直报错。
            log.error("没有钱的记录-----");
            log.error("money = " + JsonUtil.toJson(money));
            
            throw new RuntimeException("没有钱的记录");
        }
        
        //幂等校验(回调会多次)
        //积分已经存在,跳过,避免重新消费
        if(this.exist(money.getMoneyCode())) {
            log.warn("积分已经存在+++++");
            log.warn("money = " + JsonUtil.toJson(money));
            return null;
        }
        
        Score score = new Score();
        score.setUserId(money.getUserId());
        score.setScoreAdd(money.getMoney());
        score.setMoneyCode(money.getMoneyCode());
        
        this.baseMapper.insert(score);
        
        this.userService.updateScore(money);
        
        if(money.getMoney() == 8) {
            throw new RuntimeException("消费者出现异常,金额不对:money.getMoney()=" + money.getMoney());
        }
        return score;
    }
    
    
    @Transactional(readOnly = true)
    @Override
    public Score getScoreByCode(long moneyCode) {
        QueryWrapper<Score> queryWrapper  = new QueryWrapper<Score>();
        queryWrapper.eq("money_code", moneyCode);
        return this.baseMapper.selectOne(queryWrapper);
    }
    
    
    @Transactional(readOnly = true)
    @Override
    public boolean exist(long moneyCode) {
        Score score = this.getScoreByCode(moneyCode);
        if(score == null) {
            return false;
        }else {
            return true;
        }
    }
    
    
}

 

 

 

(时间宝贵,分享不易,捐赠回馈,^_^)

 

 

================================

©Copyright 蕃薯耀 2021-05-14

https://www.cnblogs.com/fanshuyao/

 

以上是关于RocketMQ可靠消息最终一致性解决方案的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ系列事务消息(数据库|最终一致性)

分布式事务

分布式事务之解决方案(可靠消息最终一致性)

消息队列之利器锋芒

分布式事务之可靠消息设计要点汇总

分布式事务解决方案-柔性事务(可靠消息保证最终一致性)