分布式消息队列ActiveMQ消息模型
Posted 码农的修炼之道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息队列ActiveMQ消息模型相关的知识,希望对你有一定的参考价值。
之前我们有一篇文章《分布式消息队列ActiveMQ使用》讲解了ActiveMQ的基本原理,掌握了启动ActiveMQ,以及会写一个简单的服务提供端和消费端的Demo。今天这篇文章,我们将继续深入研究一下ActiveMQ的消息模型。
在ActiveMQ中,一共支持4种消息类型,分别是TextMessage消息类型、BytesMessage消息类型、ObjectMessage消息类型,还有一种MapMessage消息类型。其中MapMessage消息类型应用不多,这里不介绍了。
(1) TextMessage消息类型
TextMessage消息是一种最常用的文本消息,这种消息的使用最简单。
在Producer端,直接在创建好的Session上创建一条TextMessage消息即可,然后交给Producer进行发送。具体如下:
在接收端,我们直接调用consumer的receiver方法,其中参数设置的是超时时间。因为我们设置的消息确认方式的自动确认,也就是消费端(客户端)收不到消息会自动重发,直到消费端处理了这条消息,发出确认,在Producer端才会删除这条消息。
(2) BytesMessage消息类型
BytesMessage消息是一种字节码消息,它传输的是一个字节数组。使用上只要将我们的消息变成一个字节数组即可,然后直接在producer端调用send方法发送消息即可。
在接收端,我们直接调用consumer的receiver方法,将接收到的内容保存为BytesMessage类型的数据,然后通过类似于IO流的方式读取出来即可(也可以保存为字节数组,然后处理)。
(3) ObjectMessage消息类型
这种消息类型应用也很广泛,使用这个类型的消息,需要支持对象的序列化。在Java中,只要这个类实现了Serializable接口即可。
举例来说明:我们创建一个消息类,该类包含了一些字段,比如在列车的基本信息,定义了各个字段的set/get方法,具体如下:
在Provider端,我们创建一个TrainBasicInfo对象,该对象就是一条消息内容。将其转换为ObjectMessage消息进行发送即可。
那么在Consumer端调用consumer的Receive方法接收到ObjectMessage消息,然后转换为TrainBasicInfo对象进行获取属性值。
上面的消息发送和接收可以了。
思考1:上面的消息发送和接收可以了。但是有个问题,发送端全部将消息序列化为Object类型,如何在接收端知道要转化为哪个消息对象?
解决方案就是在发送端发送前,给该ObjectMessage加一个属性,如下:
Obm.setIntProperty("msg_id",10001);
类似于一个键值对,每个消息发送前都设置一下这个,后面的值填写这个消息的msg_id即可。 代码如下:
在接收端先取这个属性值,根据这个属性值,我们就知道是哪个消息,然后转化为该对象进行提取属性即可。
int msgId =obm.getIntProperty("msg_id");然后根据switch...case进行判断即可。具体代码参考如下:
总结:
在ActiveMQ中,这三种消息模式最常用。其实在企业中,大量的通信案例都可以采用ObjectMessage消息模型进行发送和解析消息。
以上是关于分布式消息队列ActiveMQ消息模型的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot的JMS发送和接收队列消息,基于ActiveMQ