使用 Jboss 主题未重新传递消息

Posted

技术标签:

【中文标题】使用 Jboss 主题未重新传递消息【英文标题】:Message Not Redelivered Using Jboss Topic 【发布时间】:2013-08-31 15:52:11 【问题描述】:

我正在使用Jboss 4.0.2 GA

我正在使用jbossmq-destinations-service中定义的testTopic

<mbean code="org.jboss.mq.server.jmx.Topic"
     name="jboss.mq.destination:service=Topic,name=testTopic">
    <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
    <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
    <attribute name="SecurityConf">
      <security>
        <role name="guest" read="true" write="true"/>
        <role name="publisher" read="true" write="true" create="false"/>
        <role name="durpublisher" read="true" write="true" create="true"/>
      </security>
    </attribute>
    <attribute name="RedeliveryDelay">0</attribute>
  </mbean>

我为发布者和订阅者编写了相同的文件 ::

 package com.nagarro.client;

import javax.jms.*;
import javax.naming.*;

import java.io.*;
import java.util.Properties;

public class Chat implements javax.jms.MessageListener 
    private TopicSession pubSession;
    private TopicSession subSession;
    private TopicPublisher publisher;
    private TopicConnection connection;
    private String username;

    /* Constructor. Establish JMS publisher and subscriber */
    public Chat(String topicName, String username, String password)
            throws Exception 
        // Obtain a JNDI connection
        Properties properties = new Properties();
        properties.put("java.naming.factory.initial",
                "org.jnp.interfaces.NamingContextFactory");
        properties.put("java.naming.factory.url.pkgs",
                "org.jboss.naming:org.jnp.interfaces");
        properties.setProperty(Context.PROVIDER_URL, "localhost:1099");
        // ... specify the JNDI properties specific to the vendor

        InitialContext jndi = new InitialContext(properties);

        // Look up a JMS connection factory
        TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi
                .lookup("TopicConnectionFactory");

        // Create a JMS connection
        TopicConnection connection = conFactory.createTopicConnection();

        // Create two JMS session objects
        TopicSession pubSession = connection.createTopicSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        TopicSession subSession = connection.createTopicSession(false,
                Session.CLIENT_ACKNOWLEDGE);

        // Look up a JMS topic
        Topic chatTopic = (Topic) jndi.lookup(topicName);

        // Create a JMS publisher and subscriber
        TopicPublisher publisher = pubSession.createPublisher(chatTopic);
        TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);

        // Set a JMS message listener
        subscriber.setMessageListener(this);

        // Intialize the Chat application
        set(connection, pubSession, subSession, publisher, username);

        // Start the JMS connection; allows messages to be delivered
        connection.start();

    

    /* Initialize the instance variables */
    public void set(TopicConnection con, TopicSession pubSess,
            TopicSession subSess, TopicPublisher pub, String username) 
        this.connection = con;
        this.pubSession = pubSess;
        this.subSession = subSess;
        this.publisher = pub;
        this.username = username;
    

    /* Receive message from topic subscriber */
    public void onMessage(Message message) 
        try 
            TextMessage textMessage = (TextMessage) message;
            String text = textMessage.getText();
            System.out.println(text);
            if(textMessage!=null)
                throw new NullPointerException();
            
         catch (JMSException jmse) 
            jmse.printStackTrace();
        
    

    /* Create and send message using topic publisher */
    protected void writeMessage(String text) throws JMSException 
        TextMessage message = pubSession.createTextMessage();
        message.setText(username + " : " + text);
        publisher.publish(message);
    

    /* Close the JMS connection */
    public void close() throws JMSException 
        connection.close();
    

    /* Run the Chat client */
    public static void main(String[] args) 
        try 
            if (args.length != 3)
                System.out.println("Topic or username missing");

            // args[0]=topicName; args[1]=username; args[2]=password
            Chat chat = new Chat("topic/testTopic", "", "");

            // Read from command line
            BufferedReader commandLine = new java.io.BufferedReader(
                    new InputStreamReader(System.in));

            // Loop until the word "exit" is typed
            while (true) 
                String s = commandLine.readLine();
                if (s.equalsIgnoreCase("exit")) 
                    chat.close(); // close down connection
                    System.exit(0);// exit program
                 else
                    chat.writeMessage(s);
            
         catch (Exception e) 
            e.printStackTrace();
        
    

在上面的代码中,我使用的是Client_Acknowledge会话模式,但是即使我从Listener的onMessage方法中抛出NullPointerException,我也没有得到消息的重新传递。

如果需要进行任何配置更改才能重新交付,请告诉我。

【问题讨论】:

主题持久吗?无论如何,我不确定是否在 topic 的情况下重新交付(它不是队列!)。不要静默退出,而是尝试致电queueSession.recover()。我很确定在队列的情况下会有很大的不同:不调用recover() 意味着JBoss 等待超时(在超时期间这些消息不会被重新传递)。使用recover 意味着消息可以立即“重新传递”。 【参考方案1】:

对于主题,如果您在阅读消息后调用topicSession.recover(),消息将立即重新传递(例如,在 try/catch 块中,如果发生异常)。

使用 JBoss 7、非持久主题、Session.CLIENT_ACKNOWLEDGEsetMessageListener 的测试:

                topicSubscriber.setMessageListener(new MessageListener() 
                    @Override
                    public void onMessage(Message message) 
                        try 
                            throw new IllegalStateException("Test");
                         catch (Exception ex) 
                            try 
                                topicSession.recover();
                             catch (JMSException e) 
                                e.printStackTrace();
                            

                            ex.printStackTrace();
                        
                    
                );

导致立即重新发送 10 次,然后将消息放入“死信队列”。

【讨论】:

根据您的建议,在此代码行之后:TopicSession pubSession = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 我添加了一行,即pubSession.recover(); 另外我现在使用的是 non-durableTopic,但它不起作用。 :( 在您阅读邮件后致电recover()。因此,不要只是“什么都不做”(如System.exit()),而是在退出应用程序之前调用它。 我在另一篇文章中读到,您评论说您无法在 jboss 中运行它。 :) @SAM 我并不完美,JBoss 也不完美……哪个帖子? 我说的是这个:***.com/questions/18567247/…

以上是关于使用 Jboss 主题未重新传递消息的主要内容,如果未能解决你的问题,请参考以下文章

PubSub 最大传递尝试次数和死信主题

Firebase 云消息传递 - 检查现有或可用主题

MUI createTheme 未正确将主题传递给 MUI 组件

Android 屏幕锁定和 Firebase 主题消息传递

Firebase 云消息传递 - 有多个主题 - 没有消息重复

是否跨 Firebase 项目共享云消息传递主题?