消息未发送到 RTE 上的回复队列
Posted
技术标签:
【中文标题】消息未发送到 RTE 上的回复队列【英文标题】:Messages not sent to reply queue on an RTE 【发布时间】:2017-09-28 06:48:43 【问题描述】:在我们当前使用 JBoss EAP 6.2 的应用程序中,我们有许多由远程 EJB 调用触发的批处理作业。为了集中这些作业的所有通知逻辑,我们决定通过传递序列化消息来通过 MDB 路由所有调用。预期流程如下:
批处理作业客户端向远程队列发送消息 MDB 监听这个远程队列,处理消息并调用 EJB DLQ 配置为在所有重试完成时处理通知 筋疲力尽 还应在每次重试时发送通知。为了避免过多 通知,重试间隔足够长为了处理最后一点,我尝试通过在 JMSReplyTo 标头中设置一个 Reply 队列。为了模拟上述流程,我创建了以下 MDB 实现...
主 MDB:
@MessageDriven(name = "MiddleManMDB", activationConfig =
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/test"),
@ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"),
@ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=localhost;port=5445"),
@ActivationConfigProperty(propertyName = "user", propertyValue = "queueuser"),
@ActivationConfigProperty(propertyName = "password", propertyValue = "queuepassword")
)
public class MiddleManMDB implements MessageListener
private static final Logger LOGGER = LoggerFactory.getLogger(MiddleManMDB.class);
@Resource(name = "java:/JmsXA")
private ConnectionFactory connectionFactory;
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message)
try
if (message instanceof TextMessage)
LOGGER.info("Received text message --> ", ((TextMessage)message).getText());
throw new JMSException("thrown exception");
catch (Exception e)
sendToReplyQueue(e.getMessage(), message);
LOGGER.info("Throwing exception to simulate retry...");
throw new RuntimeException(e);
private void sendToReplyQueue(String errorMessage, Message message)
Context context = null;
Connection conn = null;
LOGGER.info("Sending exception details to reply queue...");
try
context = new InitialContext();
conn = connectionFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination jmsReplyTo = message.getJMSReplyTo();
MessageProducer replyProducer = session.createProducer(jmsReplyTo);
replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage));
catch (NamingException | JMSException e)
e.printStackTrace();
finally
// close connection and context
回复 MDB:
@MessageDriven(name = "ReplyMDB", activationConfig =
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/reply")
)
public class ReplyMDB implements MessageListener
private static final Logger LOGGER = LoggerFactory.getLogger(ReplyMDB.class);
@Override
public void onMessage(Message message)
try
if (message instanceof TextMessage)
LOGGER.info("Received reply message --> " + ((TextMessage)message).getText());
catch (JMSException e)
LOGGER.error("Error in reply queue...", e);
** 死信 MDB:**
@MessageDriven(name = "DeadLetterMDB", activationConfig =
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/dead")
)
public class DeadLetterMDB implements MessageListener
private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterMDB.class);
@Override
public void onMessage(Message message)
try
LOGGER.info("Message has arrived in dead letter queue");
LOGGER.info("Current delivery count - ", message.getIntProperty("JMSXDeliveryCount"));
if (message instanceof TextMessage)
LOGGER.info("Received text message --> ", ((TextMessage)message).getText());
catch (JMSException e)
e.printStackTrace();
** 客户:**
public static void main(String[] args)
Connection connection = null;
Context context = null;
try
// create context and connection factory
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) context.lookup("jms/queue/test");
Destination replyDest = (Destination) context.lookup("jms/queue/reply");
MessageProducer producer = session.createProducer(destination);
connection.start();
TextMessage message = session.createTextMessage("Hello World");
message.setJMSReplyTo(replyDest);
producer.send(message);
catch (NamingException | JMSException e)
e.printStackTrace();
finally
// close context and connection
**standalone-full.xml 中的相关条目:**
<address-settings>
<address-setting match="jms.queue.testQueue">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>1000</redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>10485760</max-size-bytes>
<address-full-policy>BLOCK</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
<address-setting match="jms.queue.replyQueue">
<redelivery-delay>1000</redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>10485760</max-size-bytes>
<address-full-policy>BLOCK</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
<address-setting match="jms.queue.DLQ">
<redelivery-delay>1000</redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>10485760</max-size-bytes>
<address-full-policy>BLOCK</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
<jms-destinations>
<jms-queue name="testQueue">
<entry name="queue/test"/>
<entry name="java:jboss/exported/jms/queue/test"/>
</jms-queue>
<jms-queue name="replyQueue">
<entry name="queue/reply"/>
<entry name="java:jboss/exported/jms/queue/reply"/>
</jms-queue>
<jms-queue name="DLQ">
<entry name="queue/dead"/>
<entry name="java:jboss/exported/jms/queue/dead"/>
</jms-queue>
<jms-topic name="testTopic">
<entry name="topic/test"/>
<entry name="java:jboss/exported/jms/topic/test"/>
</jms-topic>
</jms-destinations>
现在通过 MDB 中的上述流程,回复队列中永远不会收到消息。所有三个队列都部署在同一台服务器上。
我猜原因是下面这行:
sendToReplyQueue(e.getMessage(), message);
LOGGER.info("Throwing exception to simulate retry...");
throw new RuntimeException(e);
由于发送是异步的并且我正在抛出 RTE(以触发重试),因此消息永远不会发送。有没有办法解决这个问题?
【问题讨论】:
【参考方案1】:我猜原因是下面这行......
您可以尝试评论 RTE。还要添加更多记录器来跟踪。检查回复目的地是否设置正确。
message.setJMSReplyTo(replydestination); LOGGER.info("Reply to: " + message.getJMSReplyTo());
或消息是否发送到重播队列
replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage)); LOGGER.info("exception details sent to reply queue...");
【讨论】:
以上是关于消息未发送到 RTE 上的回复队列的主要内容,如果未能解决你的问题,请参考以下文章