持久订阅 ActiveMQ

Posted

技术标签:

【中文标题】持久订阅 ActiveMQ【英文标题】:Durable Subscription ActiveMQ 【发布时间】:2016-01-14 19:13:54 【问题描述】:

我正在尝试为我的消息设置持久订阅者,以便即使在服务器重新启动后它们也会在主题中持续存在。

但是在配置过程中我遇到了与 xml 相关的错误:

这是我的配置 xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            subscription-durable="true"
                                            durable-subscription-name="myDurableSubscription"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

但在message-driven-channel-adapter 中,以下属性会报错:

上面写着:

在这一行发现多个注释:

cvc-complex-type.3.2.2:属性 'subscription-durable' 不允许出现在元素 'int-jms:message-driven-channel- 适配器”。 cvc-complex-type.3.2.2:属性“durable-subscription-name”不允许出现在元素“int-jms:message-driven-channel-”中 适配器”。

但在更多示例中,我可以看到以下属性工作正常。

subscription-durable="true"

durable-subscription-name="myDurableSubscription"

那我的配置可能有什么问题。

编辑: POM.xml 中的 Spring 集成依赖项

<!-- Spring Integration -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>

编辑:

请看附图:

另请参阅我的日志,这表明目标方法已被调用,这应该在客户端使用消息时发生。 请帮忙。

【问题讨论】:

检查DTD是否支持这些属性 @Saravana 感谢您的快速回复。但我不熟悉 XML 操作,也没有任何想法。你能帮我吗? @Vihar 你能帮我解决这个问题吗? 【参考方案1】:

您使用的是哪个版本的 Spring Integration? 4 年前的 2.1.0.RELEASE 版本中添加了对持久订阅的支持。

当前版本是 4.2.0.RELEASE。

编辑

我忘了提到您需要一个客户端 ID 才能进行持久订阅。

您查看日志消息了吗?这似乎很清楚......

14:12:39.557 WARN [jmsIn.container-1][org.springframework.jms.listener.DefaultMessageListenerContainer] 目标“topic://topic.demo”的 JMS 消息侦听器调用程序设置失败 - 尝试恢复。原因:如果没有在 Connection 上指定唯一的 clientID,则无法创建持久订阅者

将 clientId 添加到您的连接工厂...

    <property name="clientId" value="myClient"/>

【讨论】:

我只使用 4.2.0.RELEASE 我也想知道 Spring 文档本身在这里提到了这一点 docs.spring.io/spring-integration/reference/htmlsingle/… 我通过添加 pom.xml 中使用的 spring 集成依赖项更新了问题 哦 - 看起来您没有使用支持 Spring 的 IDE(STS、IDEA 等);该错误仅在IDE中,在运行时会正常。 Basic eclipse 不知道如何找到正确的模式。我们通常建议使用无版本架构,但如果您不能使用支持 spring 的 IDE,请将版本添加到架构中 - spring-integration-jms-4.2.xsd - 您也不应该使用旧的核心 spring 架构 (2.5)。 我刚刚添加了你提到的架构,之前的错误消失了,但现在还有一个错误是Referenced file contains errors (http://www.springframework.org/schema/integration/spring-integration-4.2.xsd). For more information, right click on the message in the Problems View and select "Show Details..."

以上是关于持久订阅 ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQActiveMQ 传输协议

用于持久订阅的 stomp 协议常规序列

activemq订阅发布模式(非持久订阅)

持久订阅 ActiveMQ

分布式系列之ActiveMqActiveMq入门示例

RabbitMQ 发布订阅持久化