13 基于MQ实现订单系统的核心流程与第三方系统对接异步化改造

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13 基于MQ实现订单系统的核心流程与第三方系统对接异步化改造相关的知识,希望对你有一定的参考价值。

1.MQ生产集群

前面经过了解RocketMQ的核心架构原理,还有小规模集群的部署和压测,以及最终生产环境的集群部署,全部都搞定了。

如图,目前已经有一套3台NameServer机器+6台Broker机器的生产集群,而且经过对集群的生产参数都进行了适当优化,足以抗下每秒十多万消息的处理。

进入下一步,基于MQ对订单系统架构做改造。

2.订单系统改造方向

订单系统面临的技术问题包括以下几个环节:

  1. 下单核心流程环节太多,性能较差
  2. 订单退款的流程可能面临退款失败的风险
  3. 关闭过期订单的时候,存在扫描大量订单数据的问题
  4. 跟第三方物流系统耦合在一起,性能存在抖动的风险
  5. 大数据团队要获取订单数据,存在不规范直接查询订单数据库的问题
  6. 做秒杀活动时订单数据库压力过大

这里优先解决第一个问题,因为下单流程性能差比较明显,且直接影响用户体验。而订单退款失败是小概率问题,可以先通过人工处理。

关闭过期订单存在大量订单数据扫描的问题在订单数据量不大的情况下并不凸显严重。

跟第三方物流系统的耦合导致系统性能抖动,也是小概率出现的,并不是经常出现的。

大数据团队直接查询订单数据库跑报表出来虽然会造成压力,但影响还不大。

至于秒杀是订单数据库压力过大由于活动不是经常有,即使压力过大,只要将mysql部署在高配置物理机上,基本上也能扛住了。

3.引入MQ实现订单核心流程的异步化改造

3.1 支付订单的核心流程:

如上图,每次支付完一个订单后,都需要执行一系列的动作,包括:

  • 更新订单状态
  • 扣减库存
  • 增加积分
  • 发优惠券
  • 发短信
  • 通知发货

上述的一系列动作会导致整个订单支付的核心链路执行时间过长,可能长达好几秒。

3.2 MQ优化改造:

核心业务不变:在用户支付完毕后,只要执行最核心的更新订单状态和扣减库存就可以了,保证速度足够快。

支付后的动作异步化:诸如增加积分、发送优惠券、发送短信、通知发货的操作,都可以通过MQ实现异步化执行。

如图,订单系统仅同步执行更新订单状态和扣减库存两个最关键的操作,一旦支付成功,只要保证订单状态变为“已支付”,库存扣减掉,就可以保证核心数据不错乱。

然后订单系统会发送一个订单支付的消息到RocketMQ中去,然后积分系统会从RocketMQ里获取到消息,然后根据消息去累加积分。

营销系统会从RocketMQ里获取到消息然后发送优惠券,推送系统会从MQ里获取到信息然后推送短信,仓储系统会从MQ里获取消息然后生产物流单和发货单,去通知仓库管理员打包商品,准备交接给物流公司去发货。

以上,是改造后的业务执行流程。

3.3 优化后的效果

案例:原有架构中,更新订单状态耗费30ms,调用库存服务的接口扣减库存耗费80ms,增加积分需要耗费50ms,派发优惠券耗费60ms,发送短信耗费100ms(最高可能1s),通知发货耗费500ms(与第三方交互最高耗时可达1秒+)。总计每次订单核心链路的执行需要接近1秒钟,甚至更长时间。

MQ优化后,耗时为更新订单状态(30ms) +扣减库存(80ms)+发送订单消息到RocketMQ(10ms) ,一共120ms就可以了。

说明:这里不再出现一个圆圈不停的选择提醒用户等待后台检查订单是否支付成功的界面了,而是一旦支付成功就退回到App界面,在用户反映过来之前,就显示给用户订单支付成功的界面。

这里可以考虑一点:如果支付返回结果太慢怎么办?其实这种情况发生概率很低,sucess基本都是秒回,如果因为网络延迟原因导致结果回调太慢,也可以直接跳到订单模块主页,由前端处理。

4.需要落地的实现

一个是订单系统自身的改造,他需要去除掉调用积分系统、营销系统、推送系统以及仓储系统的逻辑,而改成发送一个订单支付消息到RocketMQ里去。

另外一个是积分系统、营销系统、推送系统以及仓储系统的改造,需要从RocketMQ里获取消息,然后根据消息执行自己的业务逻辑。

5.在订单系统中如何发送消息到RocketMQ?

想要发送消息到RocketMQ,首先要在项目里引入相关依赖:

                <dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.3.0</version>
		</dependency>

编写RockeMQ生产者的类

/**
 * @ClassName RocketMQProducer
 * @Description TODO
 * @Author wushaopei
 * @Date 2021/6/21 11:20
 * @Version 1.0
 */
public class RocketMQProducer {

    // 生产者类
    private static DefaultMQProducer producer;

    public RocketMQProducer(){

    }

    /**
     * @Description TODO 实例化生产者
     */
    @PostConstruct
    public void defaultMQProducer(){

        if(this.producer == null){
            // 实例化消息生产者Producer
            this.producer = new DefaultMQProducer("order_producer_group");
            // 这个是为Producer设置NameServer的地址,让他可以拉取路由信息
            // 这样才知道每个Topic的数据分散在哪些Broker机器上
            // 然后才可以把消息发送到Broker上去
            this.producer.setNamesrvAddr("localhost:9876");
        }
        try {
            // 这里启动一个Producer
            this.producer.start();
            System.out.println("--------producer start--------");
        }catch (MQClientException e){
            e.printStackTrace();
        }
    }

    /**
     * @Description TODO 主方法:生产者发送消息用的
     * @param topic
     * @param message
     * @return
     */
    public static void send(String topic,  String message) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        // 这里是构建一条消息对象
        Message msg = new Message(
                topic,      // 这是指定发送消息到哪个topic上去
                "",
                message.getBytes(RemotingHelper.DEFAULT_CHARSET));    // 这是消息

        // 利用Producer发送消息并接收返回结果,可以不接收
        SendResult result = producer.send(msg);
        System.out.println(result.getMsgId());
        System.out.println(result.getSendStatus());
    }
}

通过上述代码就可以让订单系统把订单消息发送到RocketMQ的一个Topic里去了。

6.订单消息会进入哪个Broker里去呢?

这里要明确一点,MQ集群中,Master Broker有两台,此时生产者的消息会进入到哪个Broker里去呢?

Topic的数据是分布式存储在多个Master Broker中的。此时有“TopicOrderPaySuccess” 这个Topic,那么它的数据会分散在两个Broker中。

当生产者发送一个订单消息过去的时候,会根据一定的负载均衡算法和容错算法把消息发送到一个Broker中去。

7.消费者从RocketMQ中获取订单消息

消费者代码如下:

package com.rocketmq.payOrder;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @ClassName RocketMQConsumer
 * @Description TODO
 * @Author wushaopei
 * @Date 2021/6/21 12:41
 * @Version 1.0
 */
public class RocketMQConsumer {

    public static void start(){
        new Thread(){

            public void run(){
                try {
                    // 这是RocketMQ消费者实例对象呢
                    // "credit_group"之类的就是消费者分组
                    // 一般来说比如积分系统就用“credis_consumer_group”
                    // 比如营销系统就用“marketing_consumer_group”
                    // 以此类推,不同的系统给自己取不同的消费组名字
                    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("credit_group");

                    // 这是给消费者设置NameServer的地址
                    // 这样就可以拉取到路由信息,知道Topic的数据在哪些broker上
                    // 然后可以从对应的broker上拉取数据
                    consumer.setNamesrvAddr("localhost:9876");

                    // 选择订阅“TopicOrderPaySuccess”的消息
                    // 这样会从这个Topic的broker机器上拉取订单消息过来
                    // 消费者订阅的主题,topic代表主题名字、* 代表所有消息
                    consumer.subscribe("TopicOrderPaySuccess","*");

                    // 注册监听器来处理拉取到的订单消息
                    // 如果consumer拉取到了订单消息,就会回调这个方法交给你处理
                    consumer.registerMessageListener(new MessageListenerConcurrently() {
                        @Override
                        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                            // 在这里对获取到的msgs订单进行处理
                            // 比如增加积分、发送优惠券、通知发货,等等
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    });
                    // 启动消费者实例
                    consumer.start();
                    System.out.println("Consumer Started.%n");
                    while (true){  // 别让线程退出,就让创建好的consumer不停消费数据
                        Thread.sleep(1000);
                    }
                } catch (MQClientException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

通过上述代码,积分系统、营销系统、推送系统、仓储系统,就可以从RocketMQ里消费“TopicOrderPaySuccess” 中的订单消息,然后根据订单消息执行增加积分、发送优惠券、发送短信、通知发货之类的业务逻辑了。

8.第三方系统对接解耦

(1)第三方系统

在订单核心流程中,订单系统间接耦合了两个第三方系统,分别是:第三方短信系统、第三方物流系统。

  • 第三方短信系统,是用来推送短信给用户的;
  • 第三方物流系统,用来生成物流单通知物流公司来收货和配送的。

同步调用流程: 订单系统会同步调用推送系统,然后推送系统调用第三方短信系统去发送短信给用户,接着订单系统会同步调用仓储系统,然后仓储系统调用第三方物流系统去生成物流单以及通知发货。

(2)第三方系统性能抖动的影响

订单系统是间接性的跟第三方短信系统和第三方物流系统耦合在一起的,这样的话,一旦第三方系统出现了性能抖动就会影响到订单系统的性能。

场景:比如正常第三方短信系统发送一个短信,只需要100ms,结果某一天突然性能下降变成发送短信需要1s了,此时会连带导致订单系统的性能也急剧下降。

(3)第三方系统性能抖动解耦

由于订单系统已经跟仓储系统和推送系统用MQ实现一步了。也就只是仓储系统自己跟第三方物流系统耦合,推送系统自己跟第三方短信系统耦合而已。

此时,即使第三方系统出现了严重的性能抖动,甚至是接口故障无法访问,也给跟订单系统没有关系。最多就是导致仓储系统调用第三方物流系统的接口时会出现短暂性的速度较慢的问题罢了。

以上是关于13 基于MQ实现订单系统的核心流程与第三方系统对接异步化改造的主要内容,如果未能解决你的问题,请参考以下文章

16 基于MQ实现秒杀订单系统的异步化架构以及精准扣减库存的技术方案

「订单」业务的设计与实现

MQ 解耦?骗你的

RocketMQ - 基于延迟消息机制优化大量订单的定时退款扫描问题

01 场景:一个真实电商订单系统的整体架构业务流程及负载情况

Django第三课 基于Django超市订单管理系统开发