Mule Quartz 调度器:处理多条消息
Posted
技术标签:
【中文标题】Mule Quartz 调度器:处理多条消息【英文标题】:Mule Quartz scheduler: process multiple messages 【发布时间】:2013-09-09 18:21:44 【问题描述】:这个问题是 4 个月前提出的。
https://***.com/posts/16241300/edit
有人吗?
“我在 mule flow 中编写了一个石英代码,每 5 分钟消耗一次队列中的所有消息。
<quartz:inbound-endpoint jobName="abc" cronExpression="0 0/1 * * * ?" doc:name="Quartz">
<quartz:endpoint-polling-job>
<quartz:job-endpoint ref="jmsEndPoint" />
</quartz:endpoint-polling-job>
</quartz:inbound-endpoint>
但即使队列中有 5 条消息,上述代码一次也只消耗一条消息。
我的要求是每 5 分钟运行一次作业并消耗队列中的所有消息。
另一个要求是使用消息负载中的唯一标识符过滤掉重复消息。
任何帮助将不胜感激。 "
编辑:JMS 端点
<jms:endpoint name="jmsEndPoint" queue="MyQueue" connector-ref="connector"/>
【问题讨论】:
请分享您的 jmsEndPoint 代码 【参考方案1】:队列是基于事件的,旨在仅返回一条消息(先进先出)。为了在 Mule 流中使用队列中的所有消息,一种方法是创建一个自定义组件,该组件将以编程方式使用队列中的 jms 消息,直到没有更多消息为止。
为了过滤重复消息,考虑使用Mule的幂等路由器:
http://www.mulesoft.org/documentation/display/current/Routing+Message+Processors#RoutingMessageProcessors-IdempotentMessageFilter
HTH
【讨论】:
能否提供在mule中配置组件的伪代码? 我试过这个public Object onCall(MuleEventContext muleEventContext) throws Exception MuleMessage[] messages = null; MuleMessage result = muleEventContext.requestEvent("queue", -1); do if (result == null) break; if (result instanceof MuleMessageCollection) MuleMessageCollection resultsCollection = (MuleMessageCollection) result; messages = resultsCollection.getMessagesAsArray(); else messages = new MuleMessage[1]; messages[0] = result; while (result !=null); return messages;
,但它一直在第一条消息上循环。【参考方案2】:
查看您的代码,您似乎需要像这样阅读它:
muleEventContext.requestEvent("MyQueue", -1);
如果你想过滤 id 你可以这样做:
<idempotent-message-filter idExpression="#[message:id]-#[header:foo]">
<simple-text-file-store directory="./idempotent"/>
</idempotent-message-filter>
【讨论】:
谢谢@Nikos。您能否提供一些关于我将如何在 mule 流程中配置它的额外信息?抱歉,我无法理解上述代码如何将所有消息从队列中取出。 那只是为了过滤 id。为了阅读它们,您需要一个自定义组件来循环浏览 jms。您可以像这样阅读它们:muleContext.client.request("jms://stagingQueue", 0) 直到它们全部消失。【参考方案3】:在您的 Mule-config xml 中:
<quartz:connector name="quartzConnector">
<receiver-threading-profile
maxThreadsActive="1" />
</quartz:connector>
<flow name="DelayedMessageProcessing">
<quartz:inbound-endpoint name="qEP6"
cronExpression="$some.cron.expression"
jobName="DelayedProcessing"
connector-ref="quartzConnector">
<jms:transaction action="ALWAYS_BEGIN" />
<quartz:event-generator-job />
</quartz:inbound-endpoint
<component class="com.something.myComponent" />
</flow>
.. 和 Java 组件:
package com.something;
import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.Callable;
public class MyComponent implements Callable
public Object onCall(final MuleEventContext muleEventContext) throws Exception
MuleMessage delayedMessage = fetchMessage(muleEventContext);
while (delayedMessage != null)
//You might have to copy properties from inbound to outbound scope here..
muleEventContext.dispatchEvent(delayedMessage, "some.jms.endpoint");
delayedMessage = fetchMessage(muleEventContext);
return null;
private MuleMessage fetchMessage(final MuleEventContext muleEventContext) throws MuleException
return muleEventContext.requestEvent("some.delayed.jms.endpoint", 3000);
【讨论】:
以上是关于Mule Quartz 调度器:处理多条消息的主要内容,如果未能解决你的问题,请参考以下文章