Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?相关的知识,希望对你有一定的参考价值。

前言

在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性、高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用。与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑。

为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给大家来一张图描述下,如图所示:

技术图片

在上述系统流程中,终端设备与服务端之间通过MQTT协议相连,而MQTT协议本质上是一种异步消息的连接方式,因此业务应用(如图中的订单系统)发起开锁请求后,IOT应用系统会以MQTT协议的方式通过物联网平台(此处使用的是AWS IOT服务)向设备发起开锁下行消息,而这一过程在IOT应用系统完成与物联网平台的交互后同步调用链路就结束了,因为物联网平台与锁设备之间通过MQTT消息服务异步解耦了,当然物联网平台会通过一系列可靠消息机制来确保开锁消息能够发送到指定设备的监听MQTT队列。而至于锁设备是否能够及时接收到开锁下行MQTT消息,则取决于锁设备当时的移动网络情况。

锁设备在收到MQTT开锁消息后,会通过嵌入式软件系统触发硬件设备完成开锁动作,之后就需要通过MQTT上行消息将开锁结果反馈到服务端,从而由服务端系统判断是否创建骑行订单并计算费用。这一过程需要物联网平台监听指定锁设备相应的MQTT上行消息队列,由于物联网平台需要与上万个设备进行连接,因而不可能将每一个锁设备所产生的MQTT上行消息都直接转发给Iot应用系统,因此在物联网平台可以将一类的设备MQTT消息转发至特定的业务消息队列中,例如开锁上行消息,所有设备的MQTT开锁响应上行消息都可以转发到表示开锁响应的Iot业务消息队列,如“iot_upstream_lock_response”,这样Iot业务系统则不需要再关注底层设备的MQTT消息,可以用更利于业务理解的方式去处理开锁响应结果。

现在的问题是通过MQTT协议的开锁下行消息、上行消息已经完全处于两条不同的异步网络链路,而链路的发起者此时却需要同步等待开锁结果,但是实际上同步链路早已在Iot应用系统向物联网平台发送开锁消息后就已经完成,所以为了满足调用方的同步请求/响应需要就需要在Iot应用系统的下发开锁消息后进行额外的同步阻塞等待,并监听开锁响应的Iot业务消息队列“iot_upstream_lock_response”关于此次开锁请求的上行消息,之后再结束掉之前的同步阻塞等待逻辑,从而实现向业务调用方返回实时开锁响应结果的同步调用效果。那么在上述流程中如何实施额外的同步阻塞以及如何进行回调消息的监听呢?在接下来的内容中就和大家一起探讨具体的实施方案!

解决方案分析

以上问题在使用消息服务进行异步解耦的应用场景中是比较普遍的需求,由于异步调用链路非常长所以通用的解决思路是在调用链的起始端进行同步阻塞,而在调用链的结束端通过回调的方式来实现,如下图所示:

技术图片

在上述图示中,链路起始队列处在发送第一次异步消息后会开启一个临时队列并同步阻塞监听该临时队列的回调消息,而链路的结束队列在完成逻辑处理后需要回调起始队列监听的临时队列,而由于请求线程一直处于阻塞监听该临时队列的状态,所以一旦收到回调消息就可以结束阻塞执行后续流程,从而完成整个链路的同步响应。
虽然常见的消息中间件都可以实现以上逻辑,例如小码哥之前所在的公司就基于RabbitMQ通过临时队列的方式实现过消息链路的同步调用,但是基于消息中间件的方式多少还是显得有些繁琐,对于常见的消息中间件如RocketMQ、RabbitMQ来说异步消息才是其强项,如果以大量临时队列的创建和销毁为代价来实现消息调用链路的同步,不仅从使用上来说显得有些麻烦,并且也会对消息中间件的稳定性带来一些不好的影响。

因此在前面提到的IOT系统中,我们采用了基于Redis的发布/订阅功能来实现异步消息链路的同步化调用。而由于Redis的高性能以及Redis的应用场景非常丰富,并且非常适合数据频繁变动的场景,在系统中既可以作为NoSQL数据库来使用,同时还支持分布式锁等功能,因而维护的性价比也相对较高。接下来我们就基于Spring Boot的开发框架来演示如何利用Redis的发布/订阅来实现异步消息链路的同步回调!

Redis发布订阅机制

Redis本身可以通过发布订阅机制实现一定的消息队列功能,在Redis中通过subscribe/publish等命令可以实现发布订阅功能,基于此原先的IOT系统处理示意图如下:

技术图片

如上图所示,在IOT应用端发送异步MQTT消息后会以消息ID组成的Key作为频道**,并保持请求线程对该频道的同步监听**,直到收到Iot业务消息队列的开锁结果上行消息后,在消息队列的消费端将该上行消息发布至同样以消息requestId组成的频道中,从而实现基于Redis发布订阅机制的异步消息系统同步调用效果。

Spring Boot代码实现

下面我们基于Spring Boot演示如何通过代码进行实现,创建Spring Boot工程后引入Spring Boot Redis集成依赖包,如下:

 <!--Redis依赖-->
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

之后在项目的配置文件中添加Redis服务连接信息,如下所示:

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456

此时项目就具备了访问Redis的能力,接下来我们通过具体的代码实现来进行功能演示。订阅监听代码如下:

@RestController
@RequestMapping("/iot")
public class IotController {

    //注入Redis消息容器对象
    @Autowired
    RedisMessageListenerContainer redisMessageListenerContainer;

    @RequestMapping(value = "/unLock", method = RequestMethod.POST)
    public boolean unLock(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId)
            throws InterruptedException, ExecutionException, TimeoutException {

        //此处实现异步消息调用处理....

        //生成监听频道Key
        String key = "IOT_" + thingName + "_" + requestId;
        //创建监听Topic
        ChannelTopic channelTopic = new ChannelTopic(key);
        //创建消息任务对象
        IotMessageTask iotMessageTask = new IotMessageTask();
        //任务对象及监听Topic添加到消息监听容器
        try {
            redisMessageListenerContainer.addMessageListener(new IotMessageListener(iotMessageTask), channelTopic);
            System.out.println("start redis subscribe listener->" + key);
            //进入同步阻塞等待,超时时间设置为60秒
            Message message = (Message) iotMessageTask.getIotMessageFuture().get(60000, TimeUnit.MILLISECONDS);
            System.out.println("receive redis callback message->" + message.toString());
        } finally {
            //销毁消息监听对象
            if (iotMessageTask != null) {
                redisMessageListenerContainer.removeMessageListener(iotMessageTask.getMessageListener());
            }
        }
        return true;
    }
}

在上述代码中,我们模拟了一个开锁请求,在完成异步消息处理后会开启Redis订阅监听,为了实现异步阻塞还需要我们创建消息任务对象,代码如下:

public class IotMessageTask<T> {

    //声明线程异步阻塞对象(JDK 1.8新提供Api)
    private CompletableFuture<T> iotMessageFuture = new CompletableFuture<>();

    //声明消息监听对象
    private MessageListener messageListener;

    //声明超时时间
    private boolean isTimeout;

    public IotMessageTask() {
    }
    public CompletableFuture<T> getIotMessageFuture() {
        return iotMessageFuture;
    }
    public void setIotMessageFuture(CompletableFuture<T> iotMessageFuture) {
        this.iotMessageFuture = iotMessageFuture;
    }
    public MessageListener getMessageListener() {
        return messageListener;
    }
    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
    public boolean isTimeout() {
        return isTimeout;
    }
    public void setTimeout(boolean timeout) {
        isTimeout = timeout;
    }
}

在消息任务对象中我们通过JDK1.8新提供的CompletableFuture类实现线程阻塞效果,并通过定义消息监听对象及超时时间完善处理机制。此外根据Controller层代码还需要自定义定义消息监听处理对象,代码如下:

public class IotMessageListener implements MessageListener {

    IotMessageTask iotMessageTask;

    public IotMessageListener(IotMessageTask iotMessageTask) {
        this.iotMessageTask = iotMessageTask;
    }

    //实现消息发布监听处理方法
    @Override
    public void onMessage(Message message, byte[] bytes) {
        System.out.println("subscribe redis iot task response:{}" + message.toString());
        //线程阻塞完成
        iotMessageTask.getIotMessageFuture().complete(message);
    }
}

此时就完成了Redis服务订阅这部分逻辑的编写,在后续的逻辑处理中需要完成消息的发布才能正常结束此处的阻塞等待,接下来我们写一段代码来模拟消息发布,代码如下:

@RestController
@RequestMapping("/iot")
public class IotCallBackController {

    //引入Redis客户端操作对象
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @RequestMapping(value = "/unLockCallBack", method = RequestMethod.POST)
    public boolean unLockCallBack(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId) {
        //生成监听频道Key
        String key = "IOT_" + thingName + "_" + requestId;
        //模拟实现消息回调
        stringRedisTemplate.convertAndSend(key, "this is a redis callback");
        return true;
    }
}

此时启动Spring Boot应用调用开锁模拟接口,逻辑就会暂时处于订阅等待状态;之后再模拟调用开锁回调Redis消息发布逻辑,之前的阻塞等待就会因为监听回调而完成同步返回。

以上是关于Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 整合 Redis消息订阅与发布

spring bootredisspring boot 集成redis的发布订阅机制

Spring boot+redis实现消息发布与订阅

Spring boot+redis实现消息发布与订阅

redis 消息队列发布订阅模式spring boot实现

Spring Boot 2.x基础教程:使用Redis的发布订阅功能