三ActiveMQ API

Posted java从零起始

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三ActiveMQ API相关的知识,希望对你有一定的参考价值。

一、Producer API

1、消息发送

消息发送有四个重载方法:

  • void send(Message message); 发送消息到创建消息生产者时指定的目的地

  • void send(Message message, int deliveryMode, int priority, long timeToLive); 发送消息到创建消息生产者时指定的目的地

  • void send(Destination destination, Message message);发送消息到指定目的地(创建消息生产者时不指定目的地)

  • void send(Destination destination, Message message int deliveryMode, int priority, long priority);发送消息到指定目的地(创建消息生产者时不指定目的地),其余参数跟第二个方法一样。

参数:

  • 1.deliveryMode:消息是否持久化,有两个取值(DeliveryMode.PERSISTENT 持久化,DeliveryMode.NON_PERSISTENT 不持久化)

  • 2.timeToLive:优先级

  • 3.timeToLive:消息有效期(单位:毫秒数)

2、消息有效期

在消息过期后,ActiveMQ默认会将过期消息保存的“死信队列 ActiveMQ.DLQ‘中,前提是需要设置消息是否持久化,不持久化的消息,在有效期过后,不会保存到死信队列中。私信队列中的消息不能恢复。

ACTIVEMQ_MSGS表中:

三、ActiveMQ API

死信队列可在 conf/activemq.xml 中配置:

消息生产者:

 
   
   
 
  1. // 创建连接工厂对象

  2. connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.48.129:61616");

  3. // 通过连接工厂对象创建连接

  4. connection = connectionFactory.createConnection();

  5. // 开启连接

  6. connection.start();

  7. // 通过连接创建会话

  8. session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

  9. destination = session.createQueue("test-destination");

  10. // 通过会话创建消息生产者,不指定目的地

  11. producer = session.createProducer(null);

  12. // 创建消息

  13. message = session.createTextMessage("测试目的地3");

  14. // 发送消息

  15. // 消息发送有四个重载方法

  16. /**

  17. * void send(Message var1) throws JMSException;

  18. * void send(Message var1, int var2, int var3, long var4) throws JMSException;

  19. * void send(Destination var1, Message var2) throws JMSException;

  20. * void send(Destination var1, Message var2, int var3, int var4, long var5) throws JMSException;

  21. */

  22. // 指定目的地,消息,持久化,0,消息有效期是5秒

  23. producer.send(destination, message, DeliveryMode.PERSISTENT, 0, 5 * 1000);

消息消费者:

 
   
   
 
  1. // 创建连接工厂对象

  2. connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.48.129:61616");

  3. // 通过连接工厂对象创建连接

  4. connection = connectionFactory.createConnection();

  5. // 开启连接

  6. connection.start();

  7. // 通过连接创建会话

  8. session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

  9. destination = session.createQueue("test-destination");

  10. // 通过会话创建消息消费者

  11. consumer = session.createConsumer(destination);

  12. // 接收消息,强转(前提是知道消息发送的类型)

  13. message = (TextMessage)consumer.receive();

  14. // 消息确认

  15. message.acknowledge();

3、消息优先级

1、在消息发送的时候可以指定消息的权重,broker可以建立权重较高的消息将会有限发送给Consumer。在某些场景下,通常希望权重较高的消息优先推送,不过由于种种原因,priority并不能决定消息推送的严格顺序(order)。

2、JMS标准中约定priority的数值为0-9,值越大表示权重越高,默认值为4。不过ActiveMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持0-9,因为JDBC存储器可以基于priority对消息进行排序和索引化。但是对于kahadb/levelDB等这种基于日志文件的存储器而言,对priority的支持相对较弱,只能识别三种优先级(LOW:< 4;NORMAL:= 4;HIGH:> 4)。

3、在broker端默认是不开启存储priority的,需要去手动开启,修改activemq.xml配置文件,在broker标签的子标签policyEntries中增加  配置。

4、对于“非持久化”类型的数据(没有被写入临时文件),它们将被保存在内存中,不存在从文件读取到内存的过程。Broker在收到Producer(消息生产者)的消息之后,会把消息缓存到内存当中,如果消息需要持久化,则同时也会把消息写入文件。如果Consumer(消息消费者)消费速度足够快,则积压在内存中的消息数据较少,尚未达到内存限制时,priority可以保证全局的顺序。如果消息数据积压超过内存限制,新接收的消息数据则会被持久化到磁盘中,priority就不会那么有效了。

5、可以设置“强顺序”和“严格顺序”:

  • 1、强顺序:在activemq.xml文件中配置 。strictOrderDispatch表示”严格顺序转发“,比如说目前有两个消费者Consumer1和Consumer2,它们的pending buffer 尺寸都为3, 现在假如有四条数据,当Consumer1填充满后(即满3条)才开始填充Consumer2。这种策略可以保证buffer中所有消息是”权重临近“的、有序的,但是并非解决priority消息顺序。

  • 2、严格顺序:在activemq.xml文件中配置 ,useCache="false"用来关闭缓存,强制将收到的所有消息都写入文件。queuePrefetch="1"用来控制每个Consumer,保证任何时刻只有一个消息被消费,每次消费消息都会从文件中获取,大量的增加了消息文件的操作数,不过每次消费的消息肯定时priority最高的。


以上是关于三ActiveMQ API的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ配置用户认证信息

ActiveMQ配置用户认证信息

Apache ActiveMQ 远程代码执行漏洞 (CVE-2016-3088)

onActivityResult 未在 Android API 23 的片段上调用

ActiveMQ API 详解

ActiveMQ 初识ActiveMQ