实战篇 | ActiveMQ in Action

Posted 5ithink

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战篇 | ActiveMQ in Action相关的知识,希望对你有一定的参考价值。

        Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License.

简介



        JMS(Java Message Service 消息服务)是一组标准的API,与具体平台无关的API,松耦合高内聚,异步通信。

  • ConnectionFactory

  • Connection 

  • Session 

  • Destination

  • Produce

  • Consume

  • Message

2.Message

  • 消息模式:Queue(点对点)和Topic(发布订阅/主题)

  • 消息类型:TextMessage、MapMessage、BytesMessage、 StreamMessage和ObjectMessage。  

  • 消息确认:接收消息—>处理消息—>确认消息

  • 消息持久模式:PERSISTENT |NON_PERSISTENT 

  • 消息优先级:默认:4 优先级分 10个级别[0-9] ,

  • 消息过期:设置过期时间,默认:永不过期

3.事务性

a.事务性会话:事务被提交的时候,确认自动

b.非事务性会:话消息确认取决于应答模式:应答模式(acknowledgement mode)

  • Session.AUTO_ACKNOWLEDGE[ 消息自动签收(默认)]

  • Session.CLIENT_ACKNOWLEDGE[客户端调用acknowledge方法手动签收]

  • Session.DUPS_ACKNOWLEDGE[非必须签收,可能会重复发送(JmsDelivered=true)]

4.Clustering

  • Shared File System Master Slave

  • JDBC Master Slave 

5.Transport  

  • VM Transport :VM内部通信,直接本地方法调用减少网络传输的开销 ;

  • TCP Transport:客户端通过TCP socket连接到broker ;

  • Discovery transport  ;

  • Failover Transport :重新连接的机制;

参数:  initialReconnectDelay 

        maxReconnectDelay 

        useExponentialBackOff 

        backOffMultiplier  

        maxReconnectAttempts 

        randomize 

        backup

failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconn ectDelay=100  

Features


1.Advisory Message

  • consumers, producers and connections starting and stopping  

  • temporary destinations being created and destroyed  

  • messages expiring on topics and queues  

  • brokers sending messages to destinations with no consumers.  

  • connections starting and stopping  

  • Client based advisories

  • Destination and Message based advisories

2.Persistence

  • AMQ Message Store 

  • Kaha Persistence 

  • JDBC Persistence 

  • Disable Persistence

3.Security

  • Simple Authentication Plugin 

  • JAAS Authentication Plugin 

  • Custom Authentication Implementation 

  • Authorization Plugin

4.Clustering 

a.Shared File System Master Slave :运行多个broker共享数据目录(通过文件排他锁抢占)

b.JDBC Master Slave :采用了数据库作为持久化存储

5.Exclusive Consumer 

         Queue中的消息是按照顺序被分发到consumers的,如果多个 consumers同时从相同的queue中消费消息时,则消费消息顺序无法保证。

6.Message Groups

        Message Groups 相当于并发的Exclusive Consumer,是Exclusive Consumer 功能的增强 。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性 JMSXGroupID 被用来区分message group,Message Groups特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer,是一种简单的负载均衡机制。

7.JMS Selectors 

             JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤 ,Message Groups可以保证具有相同groupID的消息被唯一的consumer顺序处理,可以搭配JMS Selector使用;

缺点:

  • producer必须知道当前正在运行的consumers,producer和 consumer被耦合到一起。  

  • 如果某个consumer失效,则消息将会一直被积压在broker上。

8.Pending Message Limit Strategy

        ActiveMQ通过prefetch机制来提高性能,客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由 prefetch limit来控制。

9.Composite Destinations

        通过用一个虚拟的destination 代表多个destinations:1)多 个destination之间采用","分割 ;2)不同类型的destination,那么需要加上前缀如queue:// 或 topic://  ;

10.Mirrored Queues

        ActiveMQ通过Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。

11.Wildcards

        Wildcards用来支持联合的名字分层体系(federated name hierarchies)。ActiveMQ支持以下三种 wildcards:

  •  "." 用于作为路径上名字间的分隔符;

  • "*" 用于匹配路径上的任何名字;

  • ">" 用于递归地匹配任何以这个名字开始的destination。

12.Async Sends

        ActiveMQ支持以同步(sync)方式或者异步(async)方式向broker发送消息 ,异步发送方式(默认方式);

13.Dispatch Policies 

a.Round Robin Dispatch Policy

         轮询平均分发消息 ActiveMQ的prefetch机制。ActiveMQ的缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,缺省的prefetch参数比较大。缺点:少量消息的处理时间比较长, 在缺省prefetch和dispatch policies下, 负载的不均衡分配而导致处理时间的增加 。

b.Strict Order Dispatch Policy

        保证不同的topic consumer以相同的顺序接收消息。通常 ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息。然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。

ActiveMQ应用


1.安装:

a.wget http://archive.apache.org/dist/activemq/apache-activemq/5.9.0/apache-activemq-5.9.0-bin.tar.gz

实战篇 | ActiveMQ in Action

b.tar xvf apache-activemq-5.9.0-bin.tar.gz

cd apache-activemq-5.9.0

c.启动activemq

./activemq start ;

d.验证

ps -ef |grep activemq

实战篇 | ActiveMQ in Action

实战篇 | ActiveMQ in Action2.Monitoring Broker

1)JMX监控broker

a.修改activemq.xml文件

  • broke新增useJmx="true"属性

  • managementContext新增connectorPort="11099"属性

实战篇 | ActiveMQ in Action

b.修改/bin/activemq启动脚本

  • ACTIVEMQ_SUNJMX_START=

        "$ACTIVEMQ_SUNJMX_START-Djava.rmi.server.hostname=192.168.1.200"

实战篇 | ActiveMQ in Action

c.启动jconsole

  • 远程连接:service:jmx:rmi:///jndi/rmi://192.168.1.200:11099/jmxrmi

  • 用户名:admin 密码:activemq(参考jmx.password+jmx.access file)

实战篇 | ActiveMQ in Action

实战篇 | ActiveMQ in Action

2) Web Console 

a.访问URL:http://192.168.1.200:8161 [admin/admin]

实战篇 | ActiveMQ in Action

实战篇 | ActiveMQ in Action

实战篇 | ActiveMQ in Action

3.测试Queue消息

a.代码片段

package com.think.mq;
import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;

import
org.apache.activemq.ActiveMQConnectionFactory;
import
org.junit.Test;

public class QueueTest {
@Test
   
public void send() {
try {
// 连接工厂
           
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
           
// 连接
           
Connection conn = factory.createConnection();
           
conn.start();
           
// 会话
           // 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
           
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
// 目标
           
Destination destination = session.createQueue("queue_1");
           
// 生产者
           
MessageProducer producer = session.createProducer(destination);
           
// 设置递送模式:是否持久化消息。非必须
           
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           
// 发送消息
           
for(int i = 11; i <= 20; i++) {
Message message = session.createTextMessage("hello world ! -- " + i);
               
producer.send(message);
               
System.out.println("发送消息: " + ((TextMessage) message).getText());
           
}
// 释放连接
           
producer.close();
           
session.close();
           
conn.stop();
           
conn.close();
       
} catch (Throwable e) {
e.printStackTrace();
       
}
}

@Test
   
public void receive() {
try {
// 连接工厂
           
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
           
// 连接
           
Connection conn = factory.createConnection();
           
conn.start();
           
// 会话
           // 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
           
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
// 目标
           
Destination destination = session.createQueue("queue_1");
           
// 消费者
           
MessageConsumer consumer = session.createConsumer(destination);
           
// 2. 使用consumer.receive()接收消息
           
while (true) {
Message message = consumer.receive();
               
TextMessage textMsg = (TextMessage) message;
               
System.out.println("收到消息: " + textMsg.getText());
           
}
} catch (Throwable e) {
e.printStackTrace();
       
}
}

}

b.发送Queue消息

实战篇 | ActiveMQ in Action

c.消费者接收消息:

实战篇 | ActiveMQ in Action

d.web console :http://192.168.1.200:8161/admin/queues.jsp

实战篇 | ActiveMQ in Action

4.测试Topic消息

a.代码

package com.think.mq;

import
javax.jms.Connection;
import
javax.jms.ConnectionFactory;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;

import
org.apache.activemq.ActiveMQConnectionFactory;
import
org.junit.Test;
public class TopicTest {
@Test
   
public void send() {
try {
// 连接工厂
           
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
           
// 连接
           
Connection conn = factory.createConnection();
           
conn.start();
           
// 会话 // 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
           
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
// 目标
           
Destination destination = session.createTopic("topic_1");
           
// 生产者
           
MessageProducer producer = session.createProducer(destination);
           
// 设置递送模式:是否持久化消息。非必须
           
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
           
// 发送消息
           
for(int i = 1; i <= 10; i++) {
Message message = session.createTextMessage("hello world ! -- " + i);
               
producer.send(message);
               
System.out.println("发送Topic消息: " + ((TextMessage) message).getText());
           
}
// 释放连接
           
producer.close();
           
session.close();
           
conn.stop();
           
conn.close();
       
} catch (Throwable e) {
e.printStackTrace();
       
}
}

@Test
   
public void receive() {
try {
// 连接工厂
           
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
           
// 连接
           
Connection conn = factory.createConnection();
           
conn.start();
           
// 会话 // 第一个参数: 是否开启事务 第二个参数: 消息签收方式
           
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
           
// 目标
           
Destination destination = session.createTopic("topic_1");
           
// 消费者
           
MessageConsumer consumer = session.createConsumer(destination);
           
//  使用consumer.receive()接收消息
           
while (true) {
Message message = consumer.receive();
               
TextMessage textMsg = (TextMessage) message;
               
System.out.println("收到Topic消息: " + textMsg.getText());
           
}
} catch (Throwable e) {
e.printStackTrace();
       
}
}

}

b.发送topic消息

实战篇 | ActiveMQ in Action

c.接收Topic消息

d.web console :http://192.168.1.200:8161/admin/topics.jsp

参考链接:

  • http://activemq.apache.org/


以上是关于实战篇 | ActiveMQ in Action的主要内容,如果未能解决你的问题,请参考以下文章

《ActiveMQ in Action》.pdf下载

《ActiveMQ in Action》PDF下载

ActiveMQ实战篇之ActiveMQ实现request/reply模型

pdf免费书籍下载:activemq in action.pdf(英文原版)

SPRING IN ACTION 第4版笔记-第六章RENDERING WEB VIEWS-005- 使用ApacheTiles(TilesConfigurerTilesViewResolver&(代

SPRING IN ACTION 第4版笔记-第八章Advanced Spring MVC-001- 配置SpringFlow(flow-executorflow-registryFlowHand(代