java消息中间件基础入门

Posted java与javascript

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java消息中间件基础入门相关的知识,希望对你有一定的参考价值。

. 什么是中间件?

关于中间件,官方的定义是这样的:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

. 什么是消息中间件?

关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统。

. JMS规范与AMQP

JMS(java message service)就是java消息服务。 用于两个程序之间或多台系统中发送消息,进行异步通信。
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并且不受客户端/中间件不同产品,不同开发语言等条件的限制。
二者之间的对比


. ActiveMQ、RabbitMQ、Kafka

java消息中间件基础入门

. JMS规范的消息模式

消息模式图解


队列模式
队列模式中存在生产者和消费者;生产者生产的消息会被消费者平均消费,即每一个消息只能被消费一次;消费者无需提前与生产者建立特定的关系。

主题模型
主题模式中存在发布者和订阅者;发布者发布的消息会被每一个订阅者接收,即每一个消息会被n个订阅者接收;订阅者必须在发布者发布消息之前开始监听,即订阅者无法接收监听开始之前发布者发布的消息。

. 在windows上安装activemq服务

  • 前往官网下载压缩包,当前最新版本为5.15.3

  • 解压压缩包,进入bin目录,选择win32或者win64进入,然后右键管理员权限运行activemq.bat

  • 启动完成后,在浏览器输入http://localhost:8161/进入activemq的管理界面

  • 点击正文第一个超链接登录,默认用户名与密码皆为admin

  • 由于点击脚本运行于黑窗口不便,所以可以点击目录中的InstallService.bat将服务安装至windows服务列表中。

. 写一个demo

  • 导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>

队列模式

  • 生产者

public class AppProducer {
   private static final String url = "tcp://127.0.0.1:61616";
   private static final String queueName = "queue-test";

   public static void main(String[] args) throws JMSException {
       // 1、创建连接ConnectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       // 2、创建Connection
       Connection connection = connectionFactory.createConnection();
       // 3、启动连接
       connection.start();
       // 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 5、创建一个目标(队列)
       Destination destination = session.createQueue(queueName);
       // 6、创建生产者
       MessageProducer messageProducer = session.createProducer(destination);
       for (int i = 0; i < 100; i++) {
           // 7、创建消息
           TextMessage textMessage = session.createTextMessage("test" + i);
           // 8、发送消息
           messageProducer.send(textMessage);
           System.out.println("send Message:" + textMessage.getText());
       }
       // 9、关闭连接
       connection.close();
   }
}
  • 消费者

public class AppConsumer {
   private static final String url = "tcp://127.0.0.1:61616";
   private static final String queueName = "queue-test";

   public static void main(String[] args) throws JMSException {
       // 1、创建连接ConnectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       // 2、创建Connection
       Connection connection = connectionFactory.createConnection();
       // 3、启动连接
       connection.start();
       // 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 5、创建一个目标(队列)
       Destination destination = session.createQueue(queueName);
       // 6、创建消费者
       MessageConsumer messageConsumer = session.createConsumer(destination);
       // 7、创建一个监听器
       messageConsumer.setMessageListener(new MessageListener() {
           public void onMessage(Message message) {
               TextMessage textMessage = (TextMessage) message;
               try {
                   System.out.println("receive message:" + textMessage.getText());
               } catch (JMSException e) {
                   e.printStackTrace();
               }
           }
       });
       // 8、关闭连接
       // connection.close();
   }
}
  • 注意:代码书写完成后,1、先运行生产者,运行结束后,在浏览器的管理界面中查看Queues中是否存在名称为queue-test的队列。2、运行消费者,查看消费者消费过程。3、同时运行两个消费者,然后运行生产者再生产100个消息,然后查看两个消费者消费的过程。4、注意在消费者中不需要关闭连接,因为消费者中的监听器是一个阻塞进程。

主题模式

  • 发布者

public class AppProducer {
   private static final String url = "tcp://127.0.0.1:61616";
   private static final String topicName = "topic-test";

   public static void main(String[] args) throws JMSException {
       // 1、创建连接ConnectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       // 2、创建Connection
       Connection connection = connectionFactory.createConnection();
       // 3、启动连接
       connection.start();
       // 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 5、创建一个目标
       Destination destination = session.createTopic(topicName);
       // 6、创建发布者
       MessageProducer messageProducer = session.createProducer(destination);
       for (int i = 0; i < 100; i++) {
           // 7、创建消息
           TextMessage textMessage = session.createTextMessage("test" + i);
           // 8、发送消息
           messageProducer.send(textMessage);
           System.out.println("send Message:" + textMessage.getText());
       }
       // 9、关闭连接
       connection.close();
   }
}
  • 订阅者

public class AppConsumer {
   private static final String url = "tcp://127.0.0.1:61616";
   private static final String topicName = "topic-test";

   public static void main(String[] args) throws JMSException {
       // 1、创建连接ConnectionFactory
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       // 2、创建Connection
       Connection connection = connectionFactory.createConnection();
       // 3、启动连接
       connection.start();
       // 4、创建会话(第一个参数为是否开启事务,第二个参数是会话类型,当前为自动应答)
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 5、创建一个目标
       Destination destination = session.createTopic(topicName);
       // 6、创建订阅者
       MessageConsumer messageConsumer = session.createConsumer(destination);
       // 7、创建一个监听器
       messageConsumer.setMessageListener(new MessageListener() {
           public void onMessage(Message message) {
               TextMessage textMessage = (TextMessage) message;
               try {
                   System.out.println("receive message:" + textMessage.getText());
               } catch (JMSException e) {
                   e.printStackTrace();
               }
           }
       });
       // 8、关闭连接
       // connection.close();
   }
}
  • 注意:测试流程与上面类似。但是,需要根据两种模式的性质对比来观察两种模式的代码运行结果。

. 欢迎关注

阅读本文后,希望可以随手转发点赞,谢谢您的支持。
长按二维码关注 ‘java与javascript


以上是关于java消息中间件基础入门的主要内容,如果未能解决你的问题,请参考以下文章

零基础快速入门SpringBoot2.0教程

Java中间消息件——ActiveMQ入门级运用

消息中间件入门解析

RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

Kafaka基础快速入门

WebSocket.之.基础入门-后端响应消息