Spring AMQP RabbitMQ 实现优先级队列
Posted
技术标签:
【中文标题】Spring AMQP RabbitMQ 实现优先级队列【英文标题】:Spring AMQP RabbitMQ implementing priority queue 【发布时间】:2015-01-06 11:05:03 【问题描述】:谷歌几天后,我相信我完全迷路了。我想实现一种具有大约 3 个队列的优先级队列:
-
高优先级队列(每天),需要先处理。
中等优先级队列(每周),如果队列 #1 中没有项目,则将处理该队列。 (这个队列中的消息没问题,它根本不会处理)
低优先级队列(每月),如果队列 #1 和 #2 中没有项目,则将处理该队列。 (这个队列中的消息没问题,它根本不会处理)
最初我有以下流程,让消费者消费来自所有三个队列的消息并检查队列#1、#2 和#3 中是否有任何项目。然后我意识到这是错误的,因为:
-
我完全迷失了一个问题:“我怎么知道它来自哪个队列?”。
我已经在消费来自任何队列的消息,所以如果我从低优先级队列中获取对象,如果我发现高优先级队列中有消息,我是否会将其放回队列?李>
以下是我目前的配置,可见我是个白痴。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
exchange="" routing-key="daily_queue"/>
<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
exchange="" routing-key="weekly_queue"/>
<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
exchange="" routing-key="monthly_queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
</beans>
知道我应该如何使用优先队列来解决这个问题吗?
ps:我也想知道,Apache Camel 是否有我可以依赖的东西?
更新 1:我刚从 Apache Camel 看到这个:“https://issues.apache.org/jira/browse/CAMEL-2537”JMSPriority 上的排序器似乎是我正在寻找的东西,以前有人试过吗?
更新 2:假设我要根据@Gary Russell 的推荐使用 RabbitMQ 的插件,我有以下 spring-rabbitmq 上下文 XML 配置,这似乎是有道理的(由来宾..):
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority" value="10"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
上面的xml配置已经成功创建了一个队列,名称:“ad_google_dfa_reporting_queue”,参数参数:x-max-priority: 10 & persistent: true
但不是当涉及到发送具有优先级的消息的代码时,我完全失去了它。如何定义示例 URL 中提及的优先级:https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java
AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?
更新 3:根据@Gary 的回答,我设法在消息中设置了优先级来发送消息,如下图所示: 但是,当我发送 1000 条随机优先级在 1-10 之间的消息时,消费者正在使用各种优先级的消息。 (我期望只有高优先级的消息首先被消费)。以下是消息生产者的代码:
Random random = new Random();
for (int i=0; i< 1000; i++)
final int priority = random.nextInt(10 - 1 + 1) + 1;
DfaReportingModel model = new DfaReportingModel();
model.setReportType(DfaReportingModel.ReportType.FACT);
model.setUserProfileId(0l + priority);
amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws AmqpException
message.getMessageProperties().setPriority(priority);
return message;
);
以下是消息消费者的代码:
public void consume(DfaReportingModel message)
System.out.println(message.getUserProfileId());
Thread.sleep(500);
我得到的结果:
9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,
更新 4:问题已解决!知道来自https://github.com/rabbitmq/rabbitmq-priority-queue 的示例代码在我的环境中工作,我认为问题出在spring 上下文中。因此,在对不同类型的配置进行了无数次尝试和错误之后,我指出了使这个工作有效的确切组合!并如下:
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
如果没有明确定义该值为Integer类型,则优先队列不起作用。最后,解决了。耶!
【问题讨论】:
对于 p.s.我建议添加一个 Apache Camel 标签 @mjn 完成。添加了 apache-camel。 @ben75 谢谢!!我想知道如何逐项列出事实:) 哇,你的帖子很有帮助,为我节省了很多时间!谢谢 【参考方案1】:RabbitMQ 现在有一个priority queue plugin,消息按优先级顺序传递。最好使用它而不是重新排队低优先级消息的方案,这在运行时会非常昂贵。
编辑:
当使用rabbitTemplate.convertAndSend(...)
方法时,并且您想在消息上设置优先级属性,您需要在模板中实现自定义MessagePropertiesConverter
(子类化DefaultMessagePropertiesConverter
)或使用convertAnSend
变体采用消息后处理器;例如:
template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor()
@Override
public Message postProcessMessage(Message message) throws AmqpException
message.getMessageProperties().setPriority(5);
return message;
);
【讨论】:
我看到了这个插件,对如何用 Spring-rabbitmq 实现插件感到困惑。是否有任何使用此插件的 xml 上下文示例? Russel 我设法“猜测”出最有可能的 xml 上下文配置,但是当涉及到执行 template.convertandsend(); 的 java 代码时该文件没有任何关于如何发送优先级的参考。线索? 可以优先发送消息!但奇怪的是,消费者正在以各种优先级消费消息。就像从来没有执行过优先级一样。我仔细检查了rabbitMQ插件状态,优先队列显示为启用:[E] rabbitmq_priority_queue 3.3.x-72d20292 @Gray 我正在尝试在我的本地主机上使用 RabbitMQ,但使用起来很困难,您能给我一些关于此的指示 请不要堆积在现有问题/答案上 - 打开一个新问题;用spring-amqp
标记它,我会收到一封电子邮件。请提供比“遇到困难”更多的信息。【参考方案2】:
从 3.5.0 版开始,RabbitMQ 在核心中实现了 priority queue。
您可以使用x-max-priority
参数声明优先级队列。这个参数应该是一个整数,表示队列应该支持的最大优先级。例如,使用 Java 客户端:
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);
然后您可以使用basic.properties
的优先级字段发布优先消息。数字越大表示优先级越高。
【讨论】:
以上是关于Spring AMQP RabbitMQ 实现优先级队列的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spring AMQP 和 RabbitMQ HA 进行故障转移
RabbitMQ 核心概念及与 Spring Boot 2 的整合