Java消息服务-JMS
Posted hank-hush
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java消息服务-JMS相关的知识,希望对你有一定的参考价值。
什么是JMS?
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
消息服务:
消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持应用程序开发。在Java中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果。
体系架构:
JMS对象模型
ConnectionFactory: 创建Connection对象的工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
Connection: Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
Session: Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
MessageProducer: 消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
MessageConsumer: 消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
Destination: Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
JMS的两种消息类型
在JMS标准中,有两种消息模型PTP(Point to Point),Publish/Subscribe(Pub/Sub)。
P2P模式-点对点消息传送模型:
在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。
P2P的特点
1,每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
2,发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
3,接收者在成功接收消息之后需向队列发送确认收到通知(acknowledgement)。
Pub/Sub-发布/订阅消息传递模型:
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。
在发布/订阅消息模型中,目的地被称为主题(topic),topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
Pub/Sub特点
1,每个消息可以有多个消费者。
2,发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个或多个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
3,为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
消息接收
在JMS中,消息的接收可以使用以下两种方式:
同步 使用同步方式接收消息的话,消息订阅者调用receive()方法。在receive()中,消息未到达或在到达指定时间之前,方法会阻塞,直到消息可用。
异步 使用异步方式接收消息的话,消息订阅者需注册一个消息监听者,类似于事件监听器,只要消息到达,JMS服务提供者会通过调用监听器的onMessage()递送消息。
入门Demo
pom.xml
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> </dependencies>
点对点模式:
接收方
public class QueueConsuer { public static void main(String[] args) throws Exception{ //1创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2获得连接 Connection connection = activeMQConnectionFactory.createConnection(); //3启动连接 connection.start(); //4创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5创建列表 Queue queue = session.createQueue("test-queue"); //6创建消息接收方 MessageConsumer consumer = session.createConsumer(queue); //7监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到的消息为:"+textMessage.getText()); }catch (JMSException e){ e.printStackTrace(); } } }); //8等待键盘输入 System.in.read(); //9关闭资源 consumer.close(); session.close(); connection.close(); } }
发送方
public static void main(String[] args) throws Exception{ //1创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2获得连接 Connection connection = activeMQConnectionFactory.createConnection(); //3启动连接 connection.start(); //4创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5创建列表 指定发送队列的名称 Queue queue = session.createQueue("test-queue"); //6创建消息生产者 MessageProducer producer = session.createProducer(queue); //7生产消息 TextMessage testMessage = session.createTextMessage("这是我生产的消息 java0722"); //8发送消息 producer.send(testMessage); //9关闭资源 producer.close(); session.close(); connection.close(); }
订阅发布模式
接收方(多个接收方 代码相同)
public static void main(String[] args) throws Exception{ //1创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2获得连接 Connection connection = activeMQConnectionFactory.createConnection(); //3启动连接 connection.start(); //4创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5创建列表 Topic topic = session.createTopic("test-topic"); //6创建消息接收方 MessageConsumer consumer = session.createConsumer(topic); //7监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到的消息为:"+textMessage.getText()); }catch (JMSException e){ e.printStackTrace(); } } }); //8等待键盘输入 System.in.read(); //9关闭资源 consumer.close(); session.close(); connection.close(); }
发送方
public static void main(String[] args) throws Exception{ //1创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2获得连接 Connection connection = activeMQConnectionFactory.createConnection(); //3启动连接 connection.start(); //4创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5创建列表 Topic topic = session.createTopic("test-topic"); //6创建消息接收方 MessageProducer producer = session.createProducer(topic); //7生产消息 TextMessage testMessage = session.createTextMessage("这是我生产的消息 java0722 TTTTT"); //8发送消息 producer.send(testMessage); //9关闭资源 producer.close(); session.close(); connection.close(); }
消息体
为了适应不同场景下的消息,提高消息存储的灵活性,JMS定义了几种具体类型的消息,不同的子类型的消息体也不一样,需要注意的是,Message接口并没有提供一个统一的getBody之类的方法。消息子接口定义如下:
TextMessage :最简单的消息接口,用于发送文本类的消息,设置/获取其body的方法定义如下setText()/getText()。
StreamMessage :流式消息接口,里面定义了一系列的对基本类型的set/get方法,消息发送者可以通过这些方法写入基本类型的数据,消息接收者需要按发送者的写入顺序来读取相应的数据。
MapMessage :把消息内容存储在Map里,本接口定义了一系列对基本类型的的set/get方法,与StreamMessage不同的是,每个值都对应了一个相应的key,所以消息接收者不必按顺序去读取数据。
ObjectMessage :将对象作为消息的接口,提供了一个set/get 对象的方法,需要注意的是只能设置一个对象,这个对象可以是一个Collection,但必须是序列化的。
BytesMessage :以字节的形式来传递消息的接口,除了提供了对基本类型的set/get,还提供了按字节方式进行set/get。
以上是关于Java消息服务-JMS的主要内容,如果未能解决你的问题,请参考以下文章