Java中间件-ActiveMQ
Posted daishengda
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中间件-ActiveMQ相关的知识,希望对你有一定的参考价值。
为什么需要使用消息中间件?
- 系统解耦
- 异步
- 横向扩展
- 安全可靠
- 顺序保证
什么是中间件?
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
什么是消息中间件?
关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
消息中间件图示?
什么是JMS?
java消息服务(java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
什么是AMQP?
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端.中间件不同产品,不同开发语言等条件的限制
JMS和AMQP对比
常见消息中间件对比
- ActiveMQ
- ActiveMQ特性
- RabbitMQ
- RabbitMQ特性
- kafka
- kafka特性
- 综合评价
JMS规范
- Java消息服务定义
-> Java消息服务(Java Message Service)即JMS,是一个Java平台中面向消息中间件的API,用于在两个应用程序之间或分布式系统中发送/接收消息,进行异步通信
- JMS相关者概念
-> 提供者:实现JMS规范的消息中间件服务器
-> 客户端:发送或接收消息的应用程序
-> 生产者/发布者:创建并发送消息的客户端
-> 消费者/订阅者:接收并处理消息的客户端
-> 消息:应用程序之间传递的数据内容
-> 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
- JMS消息模式
-> 队列模型
(1) 客户端包括生产者和消费者
(2) 队列中的消息只能被一个消费者消费
(3) 消费者可以随时消费队列中的消息
-> 队列模型示意图
-> 主题模型
(1) 客户端包括发布者和订阅者
(2) 主题中的消息被所有订阅者消费
(3) 消费者不能消费订阅之前就发送到主题中的消息
-> 主题模型示意图
- JMS编码接口
-> ConnectionFactory 用于创建连接到消息中间件的连接工厂
-> Connection 代表了应用程序和消息服务器之间的通信链路
-> Destination 指消息发布和接收的地点,包括队列和主题
-> Session 表示一个单线程的上下文,用于发送和接收消息
-> MessageConsumer 由会话创建,用于接收发送到目标的消息
-> MessageProducer 由会话创建,用于发送消息到目标
-> Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体
- JMS编码接口之间的关系
Linux下安装ActiveMQ
- 到官网下载
官网下载地址:http://activemq.apache.org/activemq-5154-release.html
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz //远程下载ActiveMQ压缩包
- 解压文件
tar -zxvf apache-activemq-5.15.4-bin.tar.gz -C /usr/local/ //将压缩包解压到/usr/local/目录下
- 启动ActiveMQ
cd /usr/local/apache-activemq-5.15.4/bin //进入activeMQ的bin目录
./activemq start //启动activemq
./activemq stop //停止activemq
- 为ActiveMQ添加防火墙允许端口
vim /etc/sysconfig/iptables //编辑防火墙策略文件,默认是8161、61616端口,
添加内容:
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT //管理端口
-A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT //连接端口
service iptables restart //重启防火墙
- 访问ActiveMQ管理界面
地址:http://192.168.2.121:8161/
管理帐号,默认帐号和密码都是admin
JMS代码实现
- 添加pom依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
- 队列模型下,生产者代码
1 package com.dsd.jms.queue; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 13 /** 14 * 消息提供者 15 * @author daishengda 16 * 17 */ 18 public class AppProducer { 19 20 private static final String URL = "tcp://192.168.2.121:61616"; 21 22 private static final String QUEUE_NAME = "queue-test"; 23 24 public static void main(String[] args) throws JMSException { 25 26 //1、创建ConnectionFactory 27 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); 28 29 //2、创建Connection 30 Connection connection = connectionFactory.createConnection(); 31 32 //3、启动连接 33 connection.start(); 34 35 //4、创建会话 36 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 37 38 //5、创建一个目标(创建队列模型) 39 Destination destination = session.createQueue(QUEUE_NAME); 40 41 //6、创建生产者 42 MessageProducer producer = session.createProducer(destination); 43 44 for (int i = 0; i < 100; i++) { 45 //7、创建消息 46 TextMessage textMessage = session.createTextMessage("test"+i); 47 48 //8、发布消息 49 producer.send(textMessage); 50 51 System.out.println("发送消息"+textMessage.getText()); 52 } 53 54 //9、关闭连接 55 connection.close(); 56 } 57 }
- 队列模型下,消费者代码
1 package com.dsd.jms.queue; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.Message; 8 import javax.jms.MessageConsumer; 9 import javax.jms.MessageListener; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 import org.apache.activemq.ActiveMQConnectionFactory; 14 15 /** 16 * 消息消费者 17 * @author daishengda 18 * 19 */ 20 public class AppConsumer { 21 22 private static final String URL = "tcp://192.168.2.121:61616"; 23 24 private static final String QUEUE_NAME = "queue-test"; 25 26 public static void main(String[] args) throws JMSException { 27 28 //1、创建ConnectionFactory 29 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); 30 31 //2、创建Connection 32 Connection connection = connectionFactory.createConnection(); 33 34 //3、启动连接 35 connection.start(); 36 37 //4、创建会话 38 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 39 40 //5、创建一个目标(创建队列模型) 41 Destination destination = session.createQueue(QUEUE_NAME); 42 43 //6、创建消费者 44 MessageConsumer consumer = session.createConsumer(destination); 45 46 //7、创建一个监听器 47 consumer.setMessageListener(new MessageListener() { 48 49 @Override 50 public void onMessage(Message message) { 51 TextMessage textMessage = (TextMessage) message; 52 try { 53 System.out.println("接收消息 "+textMessage.getText()); 54 } catch (JMSException e) { 55 e.printStackTrace(); 56 } 57 } 58 }); 59 60 //8、关闭连接 61 // connection.close(); 62 } 63 }
- 主题模型下,生产者代码
1 package com.dsd.jms.topic; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 13 /** 14 * 消息提供者 15 * @author daishengda 16 * 17 */ 18 public class AppProducer { 19 20 private static final String URL = "tcp://192.168.2.121:61616"; 21 22 private static final String TOPIC_NAME = "topic-test"; 23 24 public static void main(String[] args) throws JMSException { 25 26 //1、创建ConnectionFactory 27 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); 28 29 //2、创建Connection 30 Connection connection = connectionFactory.createConnection(); 31 32 //3、启动连接 33 connection.start(); 34 35 //4、创建会话 36 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 37 38 //5、创建一个目标(创建主题模型) 39 Destination destination = session.createTopic(TOPIC_NAME); 40 41 //6、创建生产者 42 MessageProducer producer = session.createProducer(destination); 43 44 for (int i = 0; i < 100; i++) { 45 //7、创建消息 46 TextMessage textMessage = session.createTextMessage("test"+i); 47 48 //8、发布消息 49 producer.send(textMessage); 50 51 System.out.println("发送消息"+textMessage.getText()); 52 } 53 54 //9、关闭连接 55 connection.close(); 56 } 57 }
- 主题模型下,消费者代码
需要提前订阅才能接收到消息,而且消息会发送给所有订阅者
1 package com.dsd.jms.topic; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.Message; 8 import javax.jms.MessageConsumer; 9 import javax.jms.MessageListener; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 import org.apache.activemq.ActiveMQConnectionFactory; 14 15 /** 16 * 消息消费者 17 * @author daishengda 18 * 19 */ 20 public class AppConsumer { 21 22 private static final String URL = "tcp://192.168.2.121:61616"; 23 24 private static final String TOPIC_NAME = "topic-test"; 25 26 public static void main(String[] args) throws JMSException { 27 28 //1、创建ConnectionFactory 29 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); 30 31 //2、创建Connection 32 Connection connection = connectionFactory.createConnection(); 33 34 //3、启动连接 35 connection.start(); 36 37 //4、创建会话 38 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 39 40 //5、创建一个目标(创建主题模型) 41 Destination destination = session.createTopic(TOPIC_NAME); 42 43 //6、创建消费者 44 MessageConsumer consumer = session.createConsumer(destination); 45 46 //7、创建一个监听器 47 consumer.setMessageListener(new MessageListener() { 48 49 @Override 50 public void onMessage(Message message) { 51 TextMessage textMessage = (TextMessage) message; 52 try { 53 System.out.println("接收消息 "+textMessage.getText()); 54 } catch (JMSException e) { 55 e.printStackTrace(); 56 } 57 } 58 }); 59 60 //8、关闭连接 61 // connection.close(); 62 } 63 }
- 补充说明
connection.createSession(paramA,paramB);
paramA是设置事务的,paramB设置acknowledgment mode
paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
paramA设置为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。
Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。
Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。
DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;
而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
使用Spring集成JMS连接ActiveMQ
- ConnectionFactory 用于管理连接的连接工厂
-> 一个Spring为我们提供的连接池
-> JmsTemplate每次发消息都会重新创建连接,会话和productor
-> spring 中提供了SingleConnectionFactory和CachingConnectionFactory
- JmsTemplate 用于发送和接收消息的模板类
-> 是spring提供的,只需向spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
-> JmsTemplate类是线程安全的,可以在整个应用范围使用
- MessageListener 消息监听器
-> 实现一个onMessage方法,该方法只接收一个Message参数
- 添加Spring-jms pom依赖
1 <properties> 2 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 3 <spring.version>4.2.5.RELEASE</spring.version> 4 </properties> 5 6 <dependencies> 7 <dependency> 8 <groupId>junit</groupId> 9 <artifactId>junit</artifactId> 10 <version>4.11</version> 11 <scope>test</scope> 12 </dependency> 13 14 <dependency> 15 <groupId>org.springframework</groupId> 16 <artifactId>spring-context</artifactId> 17 <version>${spring.version}</version> 18 </dependency> 19 20 <dependency> 21 <groupId>org.springframework</groupId> 22 <artifactId>spring-jms</artifactId> 23 <version>${spring.version}</version> 24 </dependency> 25 26 <dependency> 27 <groupId>org.springframework</groupId> 28 <artifactId>spring-test</artifactId> 29 <version>${spring.version}</version> 30 </dependency> 31 32 <dependency> 33 <groupId>org.apache.activemq</groupId> 34 <artifactId>activemq-all</artifactId> 35 <version>5.7.0</version> 36 <exclusions> 37 <exclusion> 38 <artifactId>spring-context</artifactId> 39 <groupId>spring-context</groupId> 40 </exclusion> 41 </exclusions> 42 </dependency> 43 </dependencies>
- 通用的配置common.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" 3 http://www.springframework.org/schema/beans 4 http://www.springframework.org/schema/beans/spring-beans-3.2.xsd 5 http://www.springframework.org/schema/context 6 http://www.springframework.org/schema/context/spring-context-3.2.xsd 7 http://www.springframework.org/schema/aop 8 http://www.springframework.org/schema/aop/spring-aop-3.2.xsd" default-autowire="byName" default-lazy-init="false"> 9 10 <context:annotation-config /> 11 12 <!-- ActiveMQ为我们提供的ConnectionFactory --> 13 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 14 <property name="brokerURL" value="tcp://192.168.2.121:61616" /> 15以上是关于Java中间件-ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ入门:认识并安装ActiveMQ(Windows下)