Java中间件-ActiveMQ

Posted daishengda

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中间件-ActiveMQ相关的知识,希望对你有一定的参考价值。

为什么需要使用消息中间件?

  • 系统解耦
  • 异步
  • 横向扩展
  • 安全可靠
  • 顺序保证

什么是中间件?

  非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

什么是消息中间件?

  关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。

消息中间件图示?

   

什么是JMS?

   java消息服务(java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP?

  AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端.中间件不同产品,不同开发语言等条件的限制

 JMS和AMQP对比

  

常见消息中间件对比 

  • ActiveMQ

   

  • ActiveMQ特性

    

  • RabbitMQ

    

  • RabbitMQ特性

     

  • kafka

     

  • kafka特性

    

  • 综合评价

    

 JMS规范

  • Java消息服务定义

    -> Java消息服务(Java Message Service)即JMS,是一个Java平台中面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送/接收消息,进行异步通信

  • JMS相关者概念

   -> 提供者:实现JMS规范的消息中间件服务器

   -> 客户端:发送或接收消息的应用程序

   -> 生产者/发布者:创建并发送消息的客户端

   -> 消费者/订阅者:接收并处理消息的客户端

   -> 消息:应用程序之间传递的数据内容

   -> 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

  • JMS消息模式

   -> 队列模型

       (1) 客户端包括生产者和消费者

     (2) 队列中的消息只能被一个消费者消费

     (3) 消费者可以随时消费队列中的消息

   -> 队列模型示意图

     

    -> 主题模型

      (1) 客户端包括发布者和订阅者

      (2) 主题中的消息被所有订阅者消费

      (3) 消费者不能消费订阅之前就发送到主题中的消息

    -> 主题模型示意图

     

  • JMS编码接口

    -> ConnectionFactory 用于创建连接到消息中间件的连接工厂

    -> Connection 代表了应用程序和消息服务器之间的通信链路

    -> Destination 指消息发布和接收的地点,包括队列和主题

    -> Session 表示一个单线程的上下文,用于发送和接收消息

    -> MessageConsumer 由会话创建,用于接收发送到目标的消息

    -> MessageProducer 由会话创建,用于发送消息到目标

    -> Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

  • JMS编码接口之间的关系

    

  

 Linux下安装ActiveMQ

  • 到官网下载

    官网下载地址:http://activemq.apache.org/activemq-5154-release.html

    wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz  //远程下载ActiveMQ压缩包

  • 解压文件

    tar -zxvf apache-activemq-5.15.4-bin.tar.gz -C /usr/local/  //将压缩包解压到/usr/local/目录下

  • 启动ActiveMQ

    cd /usr/local/apache-activemq-5.15.4/bin  //进入activeMQ的bin目录

    ./activemq start      //启动activemq

    ./activemq stop      //停止activemq

    

  • 为ActiveMQ添加防火墙允许端口

    vim /etc/sysconfig/iptables    //编辑防火墙策略文件,默认是8161、61616端口,

        添加内容:

          -A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT  //管理端口

          -A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT   //连接端口

    service iptables restart      //重启防火墙

    

    

  • 访问ActiveMQ管理界面

    地址:http://192.168.2.121:8161/

    管理帐号,默认帐号和密码都是admin

    

 

JMS代码实现

  • 添加pom依赖

   <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
   </dependency>

  • 队列模型下,生产者代码
 1 package com.dsd.jms.queue;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 /**
14  * 消息提供者
15  * @author daishengda
16  *
17  */
18 public class AppProducer {
19 
20     private static final String URL = "tcp://192.168.2.121:61616";
21     
22     private static final String QUEUE_NAME = "queue-test";
23     
24     public static void main(String[] args) throws JMSException {
25         
26         //1、创建ConnectionFactory
27         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
28         
29         //2、创建Connection
30         Connection connection = connectionFactory.createConnection();
31         
32         //3、启动连接
33         connection.start();
34         
35         //4、创建会话
36         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
37         
38         //5、创建一个目标(创建队列模型)
39         Destination destination = session.createQueue(QUEUE_NAME);
40         
41         //6、创建生产者
42         MessageProducer producer = session.createProducer(destination);
43         
44         for (int i = 0; i < 100; i++) {
45             //7、创建消息
46             TextMessage textMessage = session.createTextMessage("test"+i);
47             
48             //8、发布消息
49             producer.send(textMessage);
50             
51             System.out.println("发送消息"+textMessage.getText());
52         }
53         
54         //9、关闭连接
55         connection.close();
56     }
57 }
View Code

  

 

  • 队列模型下,消费者代码
 1 package com.dsd.jms.queue;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageListener;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12 
13 import org.apache.activemq.ActiveMQConnectionFactory;
14 
15 /**
16  * 消息消费者
17  * @author daishengda
18  *
19  */
20 public class AppConsumer {
21 
22     private static final String URL = "tcp://192.168.2.121:61616";
23     
24     private static final String QUEUE_NAME = "queue-test";
25     
26     public static void main(String[] args) throws JMSException {
27         
28         //1、创建ConnectionFactory
29         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
30         
31         //2、创建Connection
32         Connection connection = connectionFactory.createConnection();
33         
34         //3、启动连接
35         connection.start();
36         
37         //4、创建会话
38         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
39         
40         //5、创建一个目标(创建队列模型)
41         Destination destination = session.createQueue(QUEUE_NAME);
42         
43         //6、创建消费者
44         MessageConsumer consumer = session.createConsumer(destination);
45         
46         //7、创建一个监听器
47         consumer.setMessageListener(new MessageListener() {
48             
49             @Override
50             public void onMessage(Message message) {
51                 TextMessage textMessage = (TextMessage) message;
52                 try {
53                     System.out.println("接收消息    "+textMessage.getText());
54                 } catch (JMSException e) {
55                     e.printStackTrace();
56                 }
57             }
58         });
59         
60         //8、关闭连接
61 //        connection.close();
62     }
63 }
View Code

  

  • 主题模型下,生产者代码
 1 package com.dsd.jms.topic;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 /**
14  * 消息提供者
15  * @author daishengda
16  *
17  */
18 public class AppProducer {
19 
20     private static final String URL = "tcp://192.168.2.121:61616";
21     
22     private static final String TOPIC_NAME = "topic-test";
23     
24     public static void main(String[] args) throws JMSException {
25         
26         //1、创建ConnectionFactory
27         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
28         
29         //2、创建Connection
30         Connection connection = connectionFactory.createConnection();
31         
32         //3、启动连接
33         connection.start();
34         
35         //4、创建会话
36         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
37         
38         //5、创建一个目标(创建主题模型)
39         Destination destination = session.createTopic(TOPIC_NAME);
40         
41         //6、创建生产者
42         MessageProducer producer = session.createProducer(destination);
43         
44         for (int i = 0; i < 100; i++) {
45             //7、创建消息
46             TextMessage textMessage = session.createTextMessage("test"+i);
47             
48             //8、发布消息
49             producer.send(textMessage);
50             
51             System.out.println("发送消息"+textMessage.getText());
52         }
53         
54         //9、关闭连接
55         connection.close();
56     }
57 }
View Code

    

  • 主题模型下,消费者代码

    需要提前订阅才能接收到消息,而且消息会发送给所有订阅者

 1 package com.dsd.jms.topic;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageListener;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12 
13 import org.apache.activemq.ActiveMQConnectionFactory;
14 
15 /**
16  * 消息消费者
17  * @author daishengda
18  *
19  */
20 public class AppConsumer {
21 
22     private static final String URL = "tcp://192.168.2.121:61616";
23     
24     private static final String TOPIC_NAME = "topic-test";
25     
26     public static void main(String[] args) throws JMSException {
27         
28         //1、创建ConnectionFactory
29         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
30         
31         //2、创建Connection
32         Connection connection = connectionFactory.createConnection();
33         
34         //3、启动连接
35         connection.start();
36         
37         //4、创建会话
38         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
39         
40         //5、创建一个目标(创建主题模型)
41         Destination destination = session.createTopic(TOPIC_NAME);
42         
43         //6、创建消费者
44         MessageConsumer consumer = session.createConsumer(destination);
45         
46         //7、创建一个监听器
47         consumer.setMessageListener(new MessageListener() {
48             
49             @Override
50             public void onMessage(Message message) {
51                 TextMessage textMessage = (TextMessage) message;
52                 try {
53                     System.out.println("接收消息    "+textMessage.getText());
54                 } catch (JMSException e) {
55                     e.printStackTrace();
56                 }
57             }
58         });
59         
60         //8、关闭连接
61 //        connection.close();
62     }
63 }
View Code
  • 补充说明

    connection.createSession(paramA,paramB);

      paramA是设置事务的,paramB设置acknowledgment mode

      paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。

      paramA设置为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。

          Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。
          Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。
          DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;

                  而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。

使用Spring集成JMS连接ActiveMQ

  • ConnectionFactory 用于管理连接的连接工厂

    -> 一个Spring为我们提供的连接池

    -> JmsTemplate每次发消息都会重新创建连接,会话和productor

    -> spring 中提供了SingleConnectionFactory和CachingConnectionFactory

  • JmsTemplate 用于发送和接收消息的模板类

    -> 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms

    -> JmsTemplate类是线程安全的,可以在整个应用范围使用

  • MessageListener 消息监听器

    -> 实现一个onMessage方法,该方法只接收一个Message参数

  • 添加Spring-jms pom依赖
 1   <properties>
 2     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 3     <spring.version>4.2.5.RELEASE</spring.version>
 4   </properties>
 5 
 6   <dependencies>
 7     <dependency>
 8       <groupId>junit</groupId>
 9       <artifactId>junit</artifactId>
10       <version>4.11</version>
11       <scope>test</scope>
12     </dependency>
13     
14     <dependency>
15       <groupId>org.springframework</groupId>
16       <artifactId>spring-context</artifactId>
17       <version>${spring.version}</version>
18     </dependency>
19     
20     <dependency>
21       <groupId>org.springframework</groupId>
22       <artifactId>spring-jms</artifactId>
23       <version>${spring.version}</version>
24     </dependency>
25     
26     <dependency>
27       <groupId>org.springframework</groupId>
28       <artifactId>spring-test</artifactId>
29       <version>${spring.version}</version>
30     </dependency>
31     
32     <dependency>
33       <groupId>org.apache.activemq</groupId>
34       <artifactId>activemq-all</artifactId>
35       <version>5.7.0</version>
36       <exclusions>
37           <exclusion>
38               <artifactId>spring-context</artifactId>
39               <groupId>spring-context</groupId>
40           </exclusion>
41       </exclusions>
42     </dependency>
43   </dependencies>
View Code
  • 通用的配置common.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
 3         http://www.springframework.org/schema/beans
 4         http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
 5         http://www.springframework.org/schema/context
 6         http://www.springframework.org/schema/context/spring-context-3.2.xsd
 7         http://www.springframework.org/schema/aop
 8         http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" default-autowire="byName" default-lazy-init="false">
 9 
10     <context:annotation-config />
11     
12     <!-- ActiveMQ为我们提供的ConnectionFactory -->    
13     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
14         <property name="brokerURL" value="tcp://192.168.2.121:61616" />
15

以上是关于Java中间件-ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ入门:认识并安装ActiveMQ(Windows下)

应用安全ActiveMQ漏洞利用总结

Java中间件-ActiveMQ

Apache ActiveMQ 远程代码执行漏洞 (CVE-2016-3088)分析

分布式--ActiveMQ 消息中间件

Vulhub漏洞系列:ActiveMQ任意文件写入漏洞分析