ActiveMQ Artemis 前缀为“jms.topic”。到 Spring Boot Client 上定义的所有主题名称

Posted

技术标签:

【中文标题】ActiveMQ Artemis 前缀为“jms.topic”。到 Spring Boot Client 上定义的所有主题名称【英文标题】:ActiveMQ Artemis prefixes "jms.topic." to all topic names defined on Spring Boot Client 【发布时间】:2021-12-01 17:24:00 【问题描述】:

我在 Spring Boot 客户端上使用 ActiveMQ Artemis 2.18.0 和 spring-boot-starter-artemis 依赖项的 2.5.5 版。在我的用例中,客户端需要通过主题相互通信。问题是字符串jms.topic. 被添加到客户端上定义的每个主题的前缀。例如主题foo.sendInfo 变为jms.topic.foo.sendInfo

broker.xml 文件如下图所示。 Spring Boot 客户端使用的acceptor 是端口61617 上的netty-ssl-acceptor

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>

      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 1.18 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 844000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>844000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>844000</page-sync-timeout>

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
         <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
                                      as duplicate detection requires applicationProperties to be parsed on the server. -->
         <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
                                       default: 102400, -1 would mean to disable large mesasge control -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->


         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
         
         <!-- SSL Acceptor -->
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="admins, users"/>
            <permission type="deleteNonDurableQueue" roles="admins, users"/>
            <permission type="createDurableQueue" roles="admins, users"/>
            <permission type="deleteDurableQueue" roles="admins, users"/>
            <permission type="createAddress" roles="admins, users"/>
            <permission type="deleteAddress" roles="admins, users"/>
            <permission type="consume" roles="admins, users"/>
            <permission type="browse" roles="admins, users"/>
            <permission type="send" roles="admins, users"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="admins"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

Spring Boot客户端上的连接工厂配置如下图。

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;


@Configuration
@EnableJms
public class MQTTConfig 

    @Value("$JMS_BROKER_TRUSTSTORE")
    private String pathToTrustStore;

    @Value("$JMS_BROKER_KEYSTORE")
    private String pathToKeystore;

    @Value("$JMS_BROKER_TRUSTSTORE_PASSWORD")
    private String truststorePassword;

    @Value("$JMS_BROKER_KEYSTORE_PASSWORD")
    private String keystorePassword;



    @Bean
    public ActiveMQConnectionFactory artemisSSLConnectionFactory() 
        ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
                "trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
        artemisConnectionFactory.setUser("user");
        artemisConnectionFactory.setPassword("password");
        return artemisConnectionFactory;
    

    /**
     * Initialise @link JmsTemplate as required
     */
    @Bean
    public JmsTemplate jmsTemplate() throws JMSException 
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
        jmsTemplate.setExplicitQosEnabled(true);

        //setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    

    /**
     * Initialise @link DefaultJmsListenerContainerFactory as required
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException 
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(artemisSSLConnectionFactory());
        //setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
        factory.setPubSubDomain(true);
        return factory;
    

下面是 POM 文件,只有相关的依赖。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
    <version>2.5.5</version>
</dependency>

下面的代码 sn-p 显示了一个发布到主题 server.weatherForecast 的生产者和一个订阅同一主题的消费者。消息在此生产者和消费者之间毫无问题地交换,因为 jms.topic. 是 Spring Boot 客户端上定义的每个主题的前缀。但是,当我使用外部工具订阅 MQTT 消息时,除非订阅的主题从 server.weatherForecast 更改为 jms.topic.server.weatherForecast,否则不会收到工具上定义的主题的消息。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public void samplePC() 

    @Autowired
    private JMSTemplate jmsTemplate;

    //producer that is called by a cron job
    public void tester() 
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("serialNumber", "105");
        jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
    

    //consumer (a message from the producer should be received here, but nothing arrives)
    @JmsListener(destination = "server/forecast")
    private void consumeWeatherForecastRequest(char[] incomingMessage) 
        //some logic
        jmsTemplate.convertAndSend("someTopic", "someMessage");
    

在为RemotingConnectionImpl 启用TRACE 日志记录时,我看到在CreateSessionResponseMessage 中,serverVersion 属性的值为 131,而在CreateSessionMessage 中,version 属性有一个值127 个。

如何确保jms.topic. 不作为主题名称的前缀?

可以从this GitHub 存储库下载一个最小的可重现示例。 我试图在代码中记录前缀,但没有找到任何方法,所有日志都只是显示没有前缀的主题名称。但是,从外部客户端订阅发布到的主题应指明前缀。在订阅topicNamejms.topic.topicName 时,很明显消息将被传递给后者。我注意到一些客户端解析“。”作为“/”,因此如果“。”可以尝试其他方法。不工作。

【问题讨论】:

@JustinBertram 用 GitHub 存储库的链接更新了我的答案。 对于它的价值,示例项目不包含有关如何实际构建和运行客户端的任何细节。此外,broker.xml 中的 SSL 配置将不起作用,因为所有工件都丢失了(并且它们引用了本地磁盘上的位置)。如果您只是禁用安全性,该项目也会更简单一些,因为没有必要重现该问题。 【参考方案1】:

我拿走了您的reproducer,并设法重现了您看到的客户在哪里使用jms.topic.test.topic 的问题。但是,一旦我将multicastPrefix=jms.topic. 添加到broker.xml 中的"artemis" acceptor,问题就消失了。代理现在去除客户端的前缀并改用test.topic

您确实在“netty-ssl-acceptor”acceptor 上设置了 multicastPrefix=jms.topic.,但您的客户端实际上并未使用该接受器。

我还运行了mvn dependency:tree 以了解您的应用程序使用 ActiveMQ Artemis 1.3.0 客户端的原因。这是它的输出(部分):

[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    \- org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.9.Final:compile
[INFO]       |  +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
[INFO]       |  |  +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
[INFO]       |  |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       |  |  \- com.google.guava:guava:jar:18.0:compile
[INFO]       |  \- io.netty:netty-all:jar:4.0.32.Final:compile
[INFO]       +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
[INFO]       \- javax.inject:javax.inject:jar:1:compile

所以看来org.apache.activemq:artemis-jms-client:jar:1.3.0 的依赖直接来自org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5,这真的很奇怪,因为it has clearly defined 依赖于org.apache.activemq:artemis-jms-client:jar:2.17.0。但是,如果我将&lt;parent&gt; 更改为使用2.5.5 而不是1.4.1.RELEASE,问题就会消失,例如:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

这是mvn dependency:tree 现在输出的(部分):

[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO]    +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO]    +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO]    \- org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
[INFO]       +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
[INFO]       |  +- org.jgroups:jgroups:jar:3.6.13.Final:compile
[INFO]       |  +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
[INFO]       |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
[INFO]       |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-buffer:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-transport:jar:4.1.68.Final:compile
[INFO]       |  |  \- io.netty:netty-resolver:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec:jar:4.1.68.Final:compile
[INFO]       |  +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
[INFO]       |  \- io.netty:netty-common:jar:4.1.68.Final:compile
[INFO]       +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
[INFO]       |  +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
[INFO]       |  \- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO]       |     \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO]       \- org.apache.activemq:artemis-selector:jar:2.17.0:compile

【讨论】:

我的错,我忘了更新 GitHub 示例中使用的接受器。也许 Spring Boot 1.4.1 版只能与旧版本的 Artemis 一起使用,我会考虑迁移到 2.5.5 并在完成后发布更新。感谢贾斯汀的所有帮助。 在添加适当的multicastPrefix 后,我使用 Spring Boot 1.4.1 和 ActiveMQ Artemis 2.18.0 没有问题。你应该也能做到。【参考方案2】:

您已定义“anycastPrefix=jms.queue;multicastPrefix=jms.topic”。在您的 SSL 接受器中。你应该删除它们。 另一种解决方案是在您的连接工厂上将EnableAmq1Prefix 设置为false(但我认为这是默认值)

【讨论】:

anycastPrefixmulticastPrefix 设置旨在(至少部分)促进旧客户端(例如 ActiveMQ Artemis 1.x 客户端)的向后兼容性。它们允许代理剥离定义的前缀并使用它们来定义路由语义。为什么要删除它们?此外,enable1xPrefixes 属性在此处使用的旧版 1.x 客户端上不可用。 你说得对,我没有看到这种情况下使用了旧版客户端。

以上是关于ActiveMQ Artemis 前缀为“jms.topic”。到 Spring Boot Client 上定义的所有主题名称的主要内容,如果未能解决你的问题,请参考以下文章

javax.jms.JMSException:在向JBoss EAP 7.2中的嵌入式ActiveMQ Artemis发送消息时,未能创建会话工厂。

使用 Spring Integration 向 ActiveMQ Artemis 主题发送消息

Apache ActiveMQ Artemis 客户端能否连接到现有的 ActiveMQ 代理 5.15.X?

如何将 JMS 消息从 WildFly 10 发送到远程 ActiveMQ

Spring Boot学习笔记——Spring Boot与ActiveMQ的集成

ActiveMQ Artemis 复制无法正常工作