java消息中间件基础入门
Posted java与javascript
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java消息中间件基础入门相关的知识,希望对你有一定的参考价值。
. 什么是中间件?
关于中间件,官方的定义是这样的:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
. 什么是消息中间件?
关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统。
. JMS规范与AMQP
JMS(java message service)就是java消息服务。 用于两个程序之间或多台系统中发送消息,进行异步通信。
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并且不受客户端/中间件不同产品,不同开发语言等条件的限制。
二者之间的对比
. ActiveMQ、RabbitMQ、Kafka
. JMS规范的消息模式
消息模式图解
队列模式
队列模式中存在生产者和消费者;生产者生产的消息会被消费者平均消费,即每一个消息只能被消费一次;消费者无需提前与生产者建立特定的关系。
主题模型
主题模式中存在发布者和订阅者;发布者发布的消息会被每一个订阅者接收,即每一个消息会被n个订阅者接收;订阅者必须在发布者发布消息之前开始监听,即订阅者无法接收监听开始之前发布者发布的消息。
. 在windows上安装activemq服务
前往官网下载压缩包,当前最新版本为5.15.3
解压压缩包,进入bin目录,选择win32或者win64进入,然后右键管理员权限运行activemq.bat
启动完成后,在浏览器输入http://localhost:8161/进入activemq的管理界面
点击正文第一个超链接登录,默认用户名与密码皆为admin
由于点击脚本运行于黑窗口不便,所以可以点击目录中的InstallService.bat将服务安装至windows服务列表中。
. 写一个demo
导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
队列模式
生产者
public class AppProducer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1、创建连接ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个目标(队列)
Destination destination = session.createQueue(queueName);
// 6、创建生产者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7、创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8、发送消息
messageProducer.send(textMessage);
System.out.println("send Message:" + textMessage.getText());
}
// 9、关闭连接
connection.close();
}
}
消费者
public class AppConsumer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-test";
public static void main(String[] args) throws JMSException {
// 1、创建连接ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个目标(队列)
Destination destination = session.createQueue(queueName);
// 6、创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
// 7、创建一个监听器
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("receive message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8、关闭连接
// connection.close();
}
}
注意:代码书写完成后,1、先运行生产者,运行结束后,在浏览器的管理界面中查看Queues中是否存在名称为queue-test的队列。2、运行消费者,查看消费者消费过程。3、同时运行两个消费者,然后运行生产者再生产100个消息,然后查看两个消费者消费的过程。4、注意在消费者中不需要关闭连接,因为消费者中的监听器是一个阻塞进程。
主题模式
发布者
public class AppProducer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
// 1、创建连接ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个目标
Destination destination = session.createTopic(topicName);
// 6、创建发布者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
// 7、创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
// 8、发送消息
messageProducer.send(textMessage);
System.out.println("send Message:" + textMessage.getText());
}
// 9、关闭连接
connection.close();
}
}
订阅者
public class AppConsumer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String topicName = "topic-test";
public static void main(String[] args) throws JMSException {
// 1、创建连接ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// 2、创建Connection
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个目标
Destination destination = session.createTopic(topicName);
// 6、创建订阅者
MessageConsumer messageConsumer = session.createConsumer(destination);
// 7、创建一个监听器
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("receive message:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8、关闭连接
// connection.close();
}
}
注意:测试流程与上面类似。但是,需要根据两种模式的性质对比来观察两种模式的代码运行结果。
. 欢迎关注
阅读本文后,希望可以随手转发点赞,谢谢您的支持。
长按二维码关注 ‘java与javascript’
以上是关于java消息中间件基础入门的主要内容,如果未能解决你的问题,请参考以下文章