实战篇 | ActiveMQ in Action
Posted 5ithink
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战篇 | ActiveMQ in Action相关的知识,希望对你有一定的参考价值。
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4. Apache ActiveMQ is released under the Apache 2.0 License.
简介
JMS(Java Message Service 消息服务)是一组标准的API,与具体平台无关的API,松耦合高内聚,异步通信。
ConnectionFactory
Connection
Session
Destination
Produce
Consume
Message
2.Message
消息模式:Queue(点对点)和Topic(发布订阅/主题)
消息类型:TextMessage、MapMessage、BytesMessage、 StreamMessage和ObjectMessage。
消息确认:接收消息—>处理消息—>确认消息
消息持久模式:PERSISTENT |NON_PERSISTENT
消息优先级:默认:4 优先级分 10个级别[0-9] ,
消息过期:设置过期时间,默认:永不过期
3.事务性
a.事务性会话:事务被提交的时候,确认自动
b.非事务性会:话消息确认取决于应答模式:应答模式(acknowledgement mode)
Session.AUTO_ACKNOWLEDGE[ 消息自动签收(默认)]
Session.CLIENT_ACKNOWLEDGE[客户端调用acknowledge方法手动签收]
Session.DUPS_ACKNOWLEDGE[非必须签收,可能会重复发送(JmsDelivered=true)]
4.Clustering
Shared File System Master Slave
JDBC Master Slave
5.Transport
VM Transport :VM内部通信,直接本地方法调用减少网络传输的开销 ;
TCP Transport:客户端通过TCP socket连接到broker ;
Discovery transport ;
Failover Transport :重新连接的机制;
参数: initialReconnectDelay
maxReconnectDelay
useExponentialBackOff
backOffMultiplier
maxReconnectAttempts
randomize
backup
failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconn ectDelay=100
Features
1.Advisory Message
consumers, producers and connections starting and stopping
temporary destinations being created and destroyed
messages expiring on topics and queues
brokers sending messages to destinations with no consumers.
connections starting and stopping
Client based advisories
Destination and Message based advisories
2.Persistence
AMQ Message Store
Kaha Persistence
JDBC Persistence
Disable Persistence
3.Security
Simple Authentication Plugin
JAAS Authentication Plugin
Custom Authentication Implementation
Authorization Plugin
4.Clustering
a.Shared File System Master Slave :运行多个broker共享数据目录(通过文件排他锁抢占)
b.JDBC Master Slave :采用了数据库作为持久化存储
5.Exclusive Consumer
Queue中的消息是按照顺序被分发到consumers的,如果多个 consumers同时从相同的queue中消费消息时,则消费消息顺序无法保证。
6.Message Groups
Message Groups 相当于并发的Exclusive Consumer,是Exclusive Consumer 功能的增强 。跟所有的消息都由唯一的consumer处理不同,JMS 消息属性 JMSXGroupID 被用来区分message group,Message Groups特性保证所有具有相同JMSXGroupID 的消息会被分发到相同的consumer,是一种简单的负载均衡机制。
7.JMS Selectors
JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤 ,Message Groups可以保证具有相同groupID的消息被唯一的consumer顺序处理,可以搭配JMS Selector使用;
缺点:
producer必须知道当前正在运行的consumers,producer和 consumer被耦合到一起。
如果某个consumer失效,则消息将会一直被积压在broker上。
8.Pending Message Limit Strategy
ActiveMQ通过prefetch机制来提高性能,客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由 prefetch limit来控制。
9.Composite Destinations
通过用一个虚拟的destination 代表多个destinations:1)多 个destination之间采用","分割 ;2)不同类型的destination,那么需要加上前缀如queue:// 或 topic:// ;
10.Mirrored Queues
ActiveMQ通过Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。
11.Wildcards
Wildcards用来支持联合的名字分层体系(federated name hierarchies)。ActiveMQ支持以下三种 wildcards:
"." 用于作为路径上名字间的分隔符;
"*" 用于匹配路径上的任何名字;
">" 用于递归地匹配任何以这个名字开始的destination。
12.Async Sends
ActiveMQ支持以同步(sync)方式或者异步(async)方式向broker发送消息 ,异步发送方式(默认方式);
13.Dispatch Policies
a.Round Robin Dispatch Policy
轮询平均分发消息 ActiveMQ的prefetch机制。ActiveMQ的缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,缺省的prefetch参数比较大。缺点:少量消息的处理时间比较长, 在缺省prefetch和dispatch policies下, 负载的不均衡分配而导致处理时间的增加 。
b.Strict Order Dispatch Policy
保证不同的topic consumer以相同的顺序接收消息。通常 ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息。然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。
ActiveMQ应用
1.安装:
a.wget http://archive.apache.org/dist/activemq/apache-activemq/5.9.0/apache-activemq-5.9.0-bin.tar.gz
b.tar xvf apache-activemq-5.9.0-bin.tar.gz
cd apache-activemq-5.9.0
c.启动activemq
./activemq start ;
d.验证
ps -ef |grep activemq
2.Monitoring Broker
1)JMX监控broker
a.修改activemq.xml文件
broke新增useJmx="true"属性
managementContext新增connectorPort="11099"属性
b.修改/bin/activemq启动脚本
ACTIVEMQ_SUNJMX_START=
"$ACTIVEMQ_SUNJMX_START-Djava.rmi.server.hostname=192.168.1.200"
c.启动jconsole
远程连接:service:jmx:rmi:///jndi/rmi://192.168.1.200:11099/jmxrmi
用户名:admin 密码:activemq(参考jmx.password+jmx.access file)
2) Web Console
a.访问URL:http://192.168.1.200:8161 [admin/admin]
3.测试Queue消息
a.代码片段
package com.think.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class QueueTest {
@Test
public void send() {
try {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
// 连接
Connection conn = factory.createConnection();
conn.start();
// 会话
// 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目标
Destination destination = session.createQueue("queue_1");
// 生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式:是否持久化消息。非必须
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
for(int i = 11; i <= 20; i++) {
Message message = session.createTextMessage("hello world ! -- " + i);
producer.send(message);
System.out.println("发送消息: " + ((TextMessage) message).getText());
}
// 释放连接
producer.close();
session.close();
conn.stop();
conn.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
@Test
public void receive() {
try {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
// 连接
Connection conn = factory.createConnection();
conn.start();
// 会话
// 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目标
Destination destination = session.createQueue("queue_1");
// 消费者
MessageConsumer consumer = session.createConsumer(destination);
// 2. 使用consumer.receive()接收消息
while (true) {
Message message = consumer.receive();
TextMessage textMsg = (TextMessage) message;
System.out.println("收到消息: " + textMsg.getText());
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
b.发送Queue消息
c.消费者接收消息:
d.web console :http://192.168.1.200:8161/admin/queues.jsp
4.测试Topic消息
a.代码
package com.think.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class TopicTest {
@Test
public void send() {
try {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
// 连接
Connection conn = factory.createConnection();
conn.start();
// 会话 // 第一个参数: 是否开启事务 // 第二个参数: 消息签收方式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目标
Destination destination = session.createTopic("topic_1");
// 生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式:是否持久化消息。非必须
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送消息
for(int i = 1; i <= 10; i++) {
Message message = session.createTextMessage("hello world ! -- " + i);
producer.send(message);
System.out.println("发送Topic消息: " + ((TextMessage) message).getText());
}
// 释放连接
producer.close();
session.close();
conn.stop();
conn.close();
} catch (Throwable e) {
e.printStackTrace();
}
}
@Test
public void receive() {
try {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("failover://(tcp://192.168.1.200:61616)");
// 连接
Connection conn = factory.createConnection();
conn.start();
// 会话 // 第一个参数: 是否开启事务 第二个参数: 消息签收方式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目标
Destination destination = session.createTopic("topic_1");
// 消费者
MessageConsumer consumer = session.createConsumer(destination);
// 使用consumer.receive()接收消息
while (true) {
Message message = consumer.receive();
TextMessage textMsg = (TextMessage) message;
System.out.println("收到Topic消息: " + textMsg.getText());
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
b.发送topic消息
c.接收Topic消息
d.web console :http://192.168.1.200:8161/admin/topics.jsp
参考链接:
http://activemq.apache.org/
以上是关于实战篇 | ActiveMQ in Action的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ实战篇之ActiveMQ实现request/reply模型
pdf免费书籍下载:activemq in action.pdf(英文原版)
SPRING IN ACTION 第4版笔记-第六章RENDERING WEB VIEWS-005- 使用ApacheTiles(TilesConfigurerTilesViewResolver&(代
SPRING IN ACTION 第4版笔记-第八章Advanced Spring MVC-001- 配置SpringFlow(flow-executorflow-registryFlowHand(代