分布式事务-阿里云MQ事务消息踩坑记录

Posted 叁滴水

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务-阿里云MQ事务消息踩坑记录相关的知识,希望对你有一定的参考价值。

如果没有了解过阿里云MQ的同学,可以先看下文档。

https://help.aliyun.com/document_detail/43348.html

业务场景:用户发布一个营销活动,然后需要从主要内存库中拿出部分库存加入次要内存库中。

业务场景跟平时的转账场景几乎一致,当然,如果是在一个库中,这个业务是很好实现的一个简单的@transactional 注解就可以解决问题,可是在两个库中,实现起来就有点难度了。解决分布式事物有很多方式,我这边根据我的业务场景,选择了阿里云的MQ。

阿里云MQ基本概念:

  • 事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
  • 半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

 

  1. 发送方向 MQ 服务端发送消息。
  2. MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

重点在5 ,6,7 步骤是重点。大家好好思考下。

介绍下我的实现思路。

1.Spring 配置   配置文件的信息需要从MQ控制台配置 

 <bean id="localTransactionChecker" class="com.impl.MessageChecker"></bean>
    <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
        <property name="properties" > <!--事务消息生产者配置信息-->
            <props>
                <prop key="ProducerId">$ProducerId</prop> <!--请替换 XXX-->
                <prop key="AccessKey">$AccessKey</prop>
                <prop key="SecretKey">$SecretKey</prop>
            </props>
        </property>
        <property name="localTransactionChecker" ref="localTransactionChecker"></property>
    </bean>

配置说明 https://help.aliyun.com/document_detail/29536.html?spm=a2c4g.11186623.6.559.7e887addJ9v4zp

如果是本地环境进行测试的话,请使用公网,否则会访问不了。

消息发送代码

public class MessageSend 
    @Autowired
    private TransactionProducer transactionProducer;
    // 减库存Service
    @Autowired
    private ShopService shopService;

    /**
     *  减自己库存,并且发送事务消息
     * @param goodsId   商品id
     * @param num     减少数量
     */
    public void startAwardBefore(Long goodsId,int num) 
        String messageId= UUID.randomUUID().toString();
        Map m = new HashMap();
        m.put("goodsId",goodsId);
        m.put("num",num);
        Message msg = new Message("SHOP", "subStock",m.toString().getBytes());
        msg.setKey(messageId);
        SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() 
            @Override
            public TransactionStatus execute(Message msg, Object arg) 
                // 判断是否提交
                // 这个事务一定要已经提交,或者回滚的
                // 根据事务状态判断是否发送消息
              boolean b =   shopService.subStock( goodsId, num,messageId);
              if(b)
                  return TransactionStatus.CommitTransaction;
              else
                  return TransactionStatus.RollbackTransaction;
              
            
        ,  null);
    

 public boolean subStock(Long goodsId,int num,String messageId)
        Boolean b =  transactionTemplate.execute(new TransactionCallback<Boolean>() 
            public Boolean doInTransaction(org.springframework.transaction.TransactionStatus  s) 
                try 
                    // 减库存操作
                    int i = dao.subMyStock(goodsId,num);
                    if(i>0)
                        // 新建一个日志表 表字段 id 为 messageId,自己也可以添加其他的标识字段
                        // 为消息回查做铺垫。.
                        Log l = new Log();
                        l.setId(messageId);
                        dao.insert(Log);
                        return true;
                    else
                        return false;
                    
                catch (Exception e)
                    s.setRollbackOnly();
                    return false;
                
            
        );
        return b;
    

如果消息没有正常Commit.

消息会查代码,查询日志表是否有messageId的数据,如果有此数据,证明本地事务执行成功。

public class MessageChecker implements LocalTransactionChecker 
    @Autowired
    private LogMapper dao;
    @Override
    public TransactionStatus check(Message message) 
        String messageId= message.getKey();
        int i = dao.findIsHave(messageId);
        if(i>0)
            System.out.println("消息会查,提交消息");
            return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus
        else
            System.out.println("消息会查,回滚消息");
            return TransactionStatus.RollbackTransaction;
        
    



 

接收消息注意事项:

在互联网应用中,尤其在网络不稳定的情况下,MQ 的消息有可能会出现重复,这个重复简单可以概括为以下两种情况:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时 MQ Producer 意识到消息发送失败并尝试再次发送消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

    MQ Consumer 消费消息场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,MQ 服务端将在网络恢复后再次尝试投递之前已被处理过的消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

由于消息可能会出现重复,如果我这块的业务数据重复发送,则用户的库存信息就不能保证了,所以自己的业务要进行幂等处理。

我是新建了一个消息接收日志表,用messageId作为主键,根据主键不能重复的特性,处理这块逻辑;

 

有问题,大家可以留言交流。

以上是关于分布式事务-阿里云MQ事务消息踩坑记录的主要内容,如果未能解决你的问题,请参考以下文章

阿里云-ONS-Help-产品介绍-消息类型:事务消息

通过源码告诉你,阿里的RocketMQ事务消息到底牛逼在哪?

分布式事务之可靠消息

分布式技术专题「架构实践于案例分析」总结和盘点目前常用分布式事务特别及问题分析(中)

分布式事务测试考虑点

MQ-ONS:ONS 开发指导文档