activemq
Posted 丶落幕
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq相关的知识,希望对你有一定的参考价值。
activemq
- 前言
- 使用步骤
- 总结
前言
mq产品种类
- kafka
- rabbitmq
- rocketmq
- activemq
优势
- 能够做到系统解耦,当新的模块接进来时,可以做到代码改动最小;
能够解耦
- 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;
能够削峰
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;
能够异步
使用步骤
1.安装
#解压
tar -zxf apache-activemq-5.15.15-bin.tar.gz
#移动
mv apache-activemq-5.15.15 /opt/
#进入
cd /opt/apache-activemq-5.15.15/bin/
#启动
./activemq start
安装好jdk环境,又回来了(建议搞一台虚拟机作为原型,需要环境学习时就克隆一台 )
#停止服务
./activemq stop
#启动服务并将日志写入指定文件(前提文件夹存在)
./activemq start > /opt/activemq-logs/myrunmq.log
#浏览器访问(web访问端口是8161)
http://192.168.59.140:8161/
#默认账户
admin admin
2.java操作activemq
2.1 队列模式
发送消息(示例):
private static final String ACTIVEMQ_URL="tcp://192.168.59.140:61616";
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) throws JMSException
//1.创建连接工厂,按照指定的url,采用默认的用户名密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = factory.createConnection();
connection.start();
//3.创建会话
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题,不论队列还是主题都是继承自destination)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产3条消息发送到mq的队列里面
for (int i = 0; i < 3; i++)
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg---" + (i + 1));
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*******消息发布到MQ完成");
接收消息(示例):
private static final String ACTIVEMQ_URL="tcp://192.168.59.140:61616";
private static final String QUEUE_NAME="queue01";
public static void main(String[] args) throws JMSException
//1.创建连接工厂,按照指定的url,采用默认的用户名密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得连接connection
Connection connection = factory.createConnection();
connection.start();
//3.创建会话
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题,不论队列还是主题都是继承自destination)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true)
//6.接收消息
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null)
System.out.println("********消费者接受到消息:"+textMessage.getText());
else
break;
consumer.close();
session.close();
connection.close();
//9.关闭资源
System.out.println("*******接收消息完成");
通过监听的方式接收消息(示例):
consumer.setMessageListener(new MessageListener()
@Override
public void onMessage(Message message)
if (message != null && message instanceof TextMessage)
TextMessage textMessage=(TextMessage)message;
try
System.out.println("********消费者监听到的消息是:"+textMessage.getText());
catch (JMSException e)
e.printStackTrace();
);
//保证main方法不退出
System.in.read();
consumer.close();
session.close();
connection.close();
队列模式特点(示例):
- 没有时间上的相关性
- 保存消息,等待消费者消费
- 如果同时有多名消费者(相同的queue),那么消息会被平均分配
2.1 主题模式
发送消息(示例):
//目的地变了,由queue变成了topic
Topic topic = session.createTopic(TOPIC_NAME);
接收消息(示例):
//接收消息也是一样的
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
//当然可以用Lambda表达式简化代码
consumer.setMessageListener((message)->
if(message!=null && message instanceof TextMessage)
TextMessage textMessage= (TextMessage) message;
try
System.out.println("********消费者监听到的消息是:"+textMessage.getText());
catch (JMSException e)
e.printStackTrace();
);
主题模式特点(示例):
- 有时间上的相关性,只能消费自它订阅之后发布的消息
- 不保存消息,假如无人订阅就去生产,那就是一条废消息,一般先启动消费者再启动生产者
- 每条消息都会被所有订阅者消费(人人有份)
2.3 topic和queue的对比总结
3.jms
java message service(java消息服务是javaEE中的一个技术)
jms的组成结构和特点(示例):
-
jms provider --> 实现就jms接口和规范的消息中间件,也就是我们的mq服务器
-
jms producer --> 消息生产者,创建和发送jms消息的客户端应用
-
jms consumer --> 消息消费者,接收和处理jms消息的客户端应用
-
jms message --> 重要
-
消息头
- jmsdestination --> 目的地
- jmsdeliverymode --> 是否持久化(默认持久化)
- jmsexpiration --> 过期时间(默认永不过期)
- jmspriority --> 消息优先级(0-9十个级别,0-4是普通消息,5-9是加急消息,默认是4级)
- jmsmessageid -->唯一识别每个消息的标识由mq产生
-
消息体
- 封装集体的消息数据
- 5种消息体格式
- textmessage --> 普通字符串消息,包含一个string
- mapmessage --> 一个map类型的消息,key作为string类型,而值为java的基本类型
- bytesmessage --> 二进制数据消息,包含一个byte[]
- streammessage --> java数据流消息,用标准流操作来顺序的填充和
- objectmessage --> 对象消息,包含一个可序列化的java对象
- 发送和接受的消息体类型必须一致
-
消息属性
-
如果需要除消息头字段以外的值,那么可以使用消息属性
-
识别,去重,重点标注等操作
-
textMessage.setStringProperty("chen", "vip");
-
-
4.发布订阅
发布订阅模式本质其实是持久化的topic
发送消息(示例):
private static final String ACTIVEMQ_URL="tcp://192.168.59.140:61616";
private static final String TOPIC_NAME="pub-sub";
public static void main(String[] args) throws JMSException
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
//持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//start调用的位置变了
connection.start();
for (int i = 0; i < 3; i++)
TextMessage textMessage = session.createTextMessage("topic name---" + (i + 1));
messageProducer.send(textMessage);
messageProducer.close();
session.close();
connection.close();
System.out.println("*******持久化topic name 消息发布到MQ完成");
接收消息(示例):
private static final String ACTIVEMQ_URL="tcp://192.168.59.140:61616";
private static final String TOPIC_NAME="pub-sub";
public static void main(String[] args) throws JMSException, IOException
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
//谁订阅
connection.setClientID("zs");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
//持久化topic
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark ...");
//start调用的位置变了
connection.start();
Message message = topicSubscriber.receive();
while (message != null)
TextMessage textMessage= (TextMessage) message;
System.out.println("收到的持久化topic: "+textMessage.getText());
message = topicSubscriber.receive(5000L);
session.close();
connection.close();
- 一定先运行一次消费者,等于先mq注册,类似我订阅了这个主题
- 然后在运行生产者发送消息
- 此时,无论消费者是否在线,都会接收到,不在线的话,下次连接时会把没收到的消息都接下来
5.事务
事务偏生产者
生产者角度(示例):
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 当事务为false时,只要执行send,就会进入到队列中
- 当事务未true时,先执行send在执行commit,消息才被真正的提交的队列中
消费者立场(示例):
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 当事务为false时,只要执行receive,消息就会被消费
- 当事务未true时,先执行receive在执行commit,消息才会被真正消费
6.签收
签收偏消费者
非事务手动签收(示例):
//手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//一旦设置成手动签收,需要对每一条消息,进行签收
textMessage.acknowledge();
有事务手动签收(示例):
//有事务的手动签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
//情况1:只commit不acknowledge,消息被正常消费,不存在重复消费
session.commit();
//情况2:只acknowledge不commit,消费未被正常消费,存在重复消费
textMessage.acknowledge();
结论:事务大于ack
7.broker
代码如下(示例):
相当于一个activemq服务器实例
说白了,broker其实就是实现了用代码的形式启动activemq将mq嵌入到java代码中
,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可靠性
8.springboot整合activemq
8.1 activemq-produce(生产者)
引入依赖(示例):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
yaml配置(示例):
server:
port: 8001
spring:
activemq:
broker-url: tcp://192.168.59.140:61616
user: admin
password: admin
jms:
pub-sub-domain: false #false=queue true=topic
#定义队列名称
myqueue: boot-activemq-queue
配置bean(声明队列)(示例):
@Configuration
public class ActivemqConfig
@Value("$myqueue")
private String myqueue;
@Bean
public Queue queue()
return new ActiveMQQueue(myqueue);
编写消息生产者(示例):
@Component
public class QueueProduce
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue queue;
public void produceMsg()
jmsMessagingTemplate.convertAndSend(queue,"*****"+ UUID.randomUUID().toString());
主程序(示例):
@SpringBootApplication
@EnableJms
public class ActivemqApplication
public static void main(String[] args)
SpringApplication.run(ActivemqApplication.class, args);
测试消息发送(示例):
@SpringBootTest
class ActivemqApplicationTests
@Autowired
QueueProduce queueProduce;
@Test
void contextLoads()
queueProduce.produceMsg();
8.2 activemq-consumer(消费者)
和produce差不多,修改下端口
删除队列声明和消息发送
新建消费消息类
consumer主程序不需要@EnableJms
@Component
public class QueueConsumer
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@JmsListener(destination = "$myqueue")
public void receive(TextMessage textMessage) throws JMSException
System.out.println("*******消费者收到的消息是:"+textMessage.getText());
8.3 topic模式
topic模式就是把配置文件的pub-sub-domain改成true
声明queue改成声明topic
9.传输协议
9.1 nio协议(重点)
#进入
cd /opt/apache-activemq-5.15.15/conf/
#备份
cp activemq.xml activemq.xml.bak
#修改
vi activemq.xml
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
修改url连接协议
private static final String ACTIVEMQ_URL="nio://192.168.59.140:61618";
9.2 auto+nio
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
//可以使用nio协议连接
private static final String ACTIVEMQ_URL="nio://192.168.59.140:61608";
//也可以用tcp协议连接
private static final String ACTIVEMQ_URL="tcp://192.168.59.140:61608";
10.activemq存储机制
10.1 kahaDB消息存储
基于日志文件,从activemq5.4开始默认的持久化插件
-
db-number.log
- 每32M一个文件,文件名按照数字进行编号,当不再有引用到数据文件中的任何消息时,文件会被删除或归档 db.data
- 该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件 db.free
- 当前db.data文件里哪些页面是空闲的,文件具体内容是所有空闲业的id db.redo
- 用来进行消息回复,如果kahaDB消息存储在强制退出后启动,用来回复BTree索引 lock
- 文件锁,表示当前获得kahaDB读写权限的broker
10.2 jdbc消息存储
添加mysql驱动(示例):
修改配置(示例):
#修改配置
vi /opt/apache-activemq-5.15.15/conf/activemq.xml
#修改消息存储类型为jdbc
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
#定义数据源
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.59.140/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
创建activemq数据库
启动activemq,成功就可以看到表已经建好了
测试发送消息(当消息被消费时则会从数据库删除)
10.2 activemq journal
以上是关于activemq的主要内容,如果未能解决你的问题,请参考以下文章