JMS 2.0 - 如何接收来自共享消费者主题的消息?

Posted

技术标签:

【中文标题】JMS 2.0 - 如何接收来自共享消费者主题的消息?【英文标题】:JMS 2.0 - How to receive messages from topic with shared consumers? 【发布时间】:2019-04-21 13:16:51 【问题描述】:

我正在使用 ActiveMQ Artemis 和 JMS 2.0 来读取共享消费者的主题消息。我有两个问题:

    有没有办法使用xml格式的配置。 当我在消费者上设置消息侦听器时,是否必须使用while 循环?如果我不使用while (true) 循环,程序将在主题没有消息时终止。

SharedConsumer.java

public class SharedConsumer 
    @Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
    ConnectionFactory connectionFactory;

    public String maxConnectionForJSON;

    public void readFromTopicAndSendToQueue()throws Exception
        Context initialContext = null;
        JMSContext jmsContext = null;
        int maxConnectionCount = 0;

        maxConnectionForJSON = "30";

        if (!StringUtils.isBlank(maxConnectionForJSON))
            try
                maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
            catch (Exception e)
                //logging
            
        
        if (maxConnectionCount != 0) 
            try 
                List<JMSConsumer> jmsConsumerList = new ArrayList<>();
                initialContext = new InitialContext();

                Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");

                ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

                jmsContext = cf.createContext("admin", "admin");

                for (int i = 0; i < maxConnectionCount; i++)
                    JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
                    MessageListener listener = new Listener();
                    jmsConsumer.setMessageListener(listener);

                
                while (true) 
                    Thread.sleep(30000);
                
             catch (Exception e) 
                System.err.println(e.getMessage());
             finally 
                 if (initialContext != null) 
                     initialContext.close();
                 
                 if (jmsContext != null) 
                     jmsContext.close();
                 
            
        
    

    public static void main(final String[] args) throws Exception 
        SharedConsumer sharedConsumer = new SharedConsumer();
        sharedConsumer.readFromTopicAndSendToQueue();
    

SharedConsumerListener.java

public class Listener implements MessageListener 
    public static int count = 0;

    @Override
    public void onMessage(Message message) 
        System.out.println(message.toString() + "\ncount :" + count);
        count++;
    

我可以使用 xml 文件来读取 JMS 1.1 (ActiveMQ) 中的队列。我以为我们可以在 JMS 2.0 Artemis 中使用如下配置文件,但我错了。非常感谢 Justin Bertram 的帮助。

在 JMS 1.1 配置文件中

<bean id="brokerUrl" class="java.lang.String">
   <constructor-arg value="#appProperties.queueUrl"/>
</bean>

<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
   <constructor-arg ref="amqConnectionFactory"/>
   <property name="maxConnections" value="#appProperties.maxConnections"/>
   <property name="idleTimeout" value="#appProperties.idleTimeout"/>
   <property name="maximumActiveSessionPerConnection" value = "10"/> 

</bean>

<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
   <constructor-arg ref="connectionFactory1"/>
</bean>

<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
   <constructor-arg value="#appProperties.queueName"/>
</bean>

<task:executor id="mainExecutorForJSON" pool-size="#appProperties.mainExecutorForJSONPoolSize"
               queue-capacity="0" rejection-policy="CALLER_RUNS"/>

<int:channel id="jmsInChannelForJSON" >
    <int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
                                        concurrent-consumers="#appProperties.concurrentConsumerCountForJSON" />

<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />

【问题讨论】:

我不明白你想通过 xml 配置什么。 JMS 只是一个 Java API。你能澄清一下吗? 【参考方案1】:

简而言之,是的,一旦您设置了 JMS 使用者的消息侦听器,阻止程序终止是正常的。

当您创建 JMS 使用者并设置其消息侦听器时,JMS 客户端实现将在后台创建新线程,以从创建使用者并设置侦听器的线程异步侦听消息。因此,创建消费者和设置侦听器的线程将继续进行。在您的情况下,您需要以某种方式阻止线程退出并终止应用程序,因此您需要 while 循环。

【讨论】:

以上是关于JMS 2.0 - 如何接收来自共享消费者主题的消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何从 MQTT 生产并在 ActiveMQ 中作为 MQTT 和 JMS 消费

Spring使用Spring和AMQP发送接收消息(上)

JMS 工作流 - 混合队列和主题

JMS 主题与选择器

一个 JMS 消费者停止监听活动的 mq 主题,而第二个没有

将最后发送的消息发送给 jms 主题的新消费者