一ActiveMQ消息队列

Posted laughing Coding

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一ActiveMQ消息队列相关的知识,希望对你有一定的参考价值。

一、Message Queue消息队列

异步处理

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。

消息队列主要有两点好处:1.通过异步处理提高系统性能(削峰、减少响应所需时间);2.降低系统耦合性。

消息队列的两种模式

点对点模式

一、ActiveMQ消息队列

一个消息只会被一个消费者消费

发布订阅模式

一、ActiveMQ消息队列

二、消息中间件

一、ActiveMQ消息队列

三、ActiveMQ安装

官网:http://activemq.apache.org/

tar -zxvf apache--activemq-5.16.0

bin目录下启动

/usr/activemq/apache-activemq-5.16.0/bin
./activemq start
ps -ef|grep activemq

activemq的默认端口为61616

++++++++++++++++++++++++++++++++++++++

启动报错 cat activemq.log

一、ActiveMQ消息队列

服务器名称不能有下滑线

vim /etc/sysconfig/network

一、ActiveMQ消息队列

重新远程连接即可

++++++++++++++++++++++++++++++++++++++++++++++

日志目录 /apache-activemq-5.16.0/data/activemq.log

一、ActiveMQ消息队列

四、ActiveMQ控制台

端口8161

注意开端口策略

修改jetty.xml

一、ActiveMQ消息队列

用户名密码

一、ActiveMQ消息队列

一、ActiveMQ消息队列

五、JMS标准

JMS即Java消息服务 (Java Message Service)应用程序接口,是一个Java平台 中关于面向消息中间件 (MOM)的API ,用于在两个应用程序之间,或分布式系统 中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。

一、ActiveMQ消息队列

六、生产者

需要的包

<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.16.0</version></dependency><dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.6</version></dependency>
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://49.232.61.212:61616"; public static final String QUEUE_NAME = "laughing-queue";
public static void main(String[] args) throws JMSException { // 1、创建ActiveMQConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2、创建connection 启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3、创建session createSession(boolean transacted, int acknowledgeMode) // transacted 事务 , acknowledgeMode 签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建目的地 队列 // Destination destination = session.createQueue(ACTIVEMQ_URL); Queue queue = session.createQueue(ACTIVEMQ_URL); // 5、创建消息生产者 放入队列 MessageProducer messageProducer = session.createProducer(queue); // 6、生成消息发送到queue for (int i = 0; i < 10; i++) { // 7、生成消息 TextMessage textMessage = session.createTextMessage("laughing-msg " + i); // 8、通过生产者messageProducer,发送给MQ messageProducer.send(textMessage);
} // 9、关闭 messageProducer.close(); session.close(); connection.close(); System.out.println("MQ发送成功!");

}}

发送10条消息到MQ的队列

一、ActiveMQ消息队列

七、消费者

receive同步阻塞

public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "laughing-queue";
public static void main(String[] args) throws JMSException { // 1、创建ActiveMQConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2、创建connection 启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3、创建session createSession(boolean transacted, int acknowledgeMode) // transacted 事务 , acknowledgeMode 签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建目的地 队列 // Destination destination = session.createQueue(ACTIVEMQ_URL); Queue queue = session.createQueue(ACTIVEMQ_URL); // 5、消费者 MessageConsumer messageConsumer = session.createConsumer(queue); while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(); System.out.println("接收MQ:"); if (null != textMessage) { System.out.println(textMessage.getText()); } else { break; } } messageConsumer.close(); session.close(); connection.close();
}}

八、消费者监听

异步非阻塞方法

public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://49.232.61.212:61616"; public static final String QUEUE_NAME = "laughing-queue";
public static void main(String[] args) throws JMSException, IOException { // 1、创建ActiveMQConnectionFactory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2、创建connection 启动 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3、创建session createSession(boolean transacted, int acknowledgeMode) // transacted 事务 , acknowledgeMode 签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建目的地 队列 // Destination destination = session.createQueue(ACTIVEMQ_URL); Queue queue = session.createQueue(ACTIVEMQ_URL); // 5、消费者 // receive 方法 MessageConsumer messageConsumer = session.createConsumer(queue);/* while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(4000L); System.out.println("接收MQ:"); if (null != textMessage) { System.out.println(textMessage.getText()); } else { break; } }*/ // 消费者监听
messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者接收消息--"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); }
}
} });
System.in.read();
messageConsumer.close(); session.close(); connection.close();
}}

九、两个消费者的情况

1、先启动生产者,在依次启动两个消费者

第二个消费者拿不到消息

一、ActiveMQ消息队列

2、启动两个消费者,再启动生产者

轮询平均分配

十、总结

以上是关于一ActiveMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章

一ActiveMQ消息队列

ActiveMQ消息队列的使用及应用

ActiveMQ的消息模式——队列模式(Queue)

java操作ActiveMQ消息队列

消息队列ActiveMQ的使用详解

分布式消息队列ActiveMQ消息模型