是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?

Posted

技术标签:

【中文标题】是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?【英文标题】:Are there any MQ servers that can run embedded in a Java process? 【发布时间】:2012-12-29 19:53:21 【问题描述】:

我正在为我团队的一个应用研究排队解决方案。理想情况下,我们希望可以配置为轻量级的进程内代理(用于线程之间的低吞吐量消息传递)和外部代理。是否有可以执行此操作的 MQ 服务器?大多数似乎需要设置为外部实体。 ZeroMQ 似乎最接近进程内解决方案,但它似乎更像是“类固醇上的 UDP 套接字”,我们需要可靠的交付。

【问题讨论】:

我认为***.com/questions/2507536/lightweight-jms-broker 的答案包含有趣的信息(例如ffmq 建议)。 ActiveMQ 是另一个较重的候选者,但它也是可嵌入的。 就像@fvu 所说,ActiveMQ 比 ZeroMQ 重一些,但它作为一个嵌入式进程运行得非常好。如果您使用的是 Spring,那么设置起来非常容易。 ZeroMQ 在提供可靠传输的 TCP(而非 UDP)之上运行。但是,您指的是持久队列吗? IE。备份到光盘? 我们不关心消息本身的持久化,但我们确实希望传递尽可能可靠。性能不是问题(还)。我假设这意味着队列应该是持久的。 @fvu 感谢您的输入,我将使用 ActiveMQ。您能否重新发布作为答案,以便我接受? 【参考方案1】:

就像我们说的ActiveMQZeroMQ 重一点,但它作为嵌入式进程运行得非常好。 这里有一个简单的例子,SpringActiveMQ

将用于测试队列的消息侦听器:

public class TestMessageListener implements MessageListener 

    private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class);

    @Override
    public void onMessage(Message message) 

        /* Receive the text message */
        if (message instanceof TextMessage) 

            try 
                String text = ((TextMessage) message).getText();
                System.out.println("Message reception from the JMS queue : " + text);

             catch (JMSException e) 
                logger.error("Error : " + e.getMessage());
            

         else 
            /* Handle non text message */
        
    

ActiveMQ上下文配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>tcp://localhost:61617</value>
        </property>
    </bean>

    <bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <constructor-arg ref="jmsQueueConnectionFactory" />
    </bean>

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="messageQueue" />
    </bean>

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="pooledJmsQueueConnectionFactory" />
       <property name="pubSubDomain" value="false"/>
    </bean>

    <bean id="testMessageListener" class="com.example.jms.TestMessageListener" />

    <bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" />
        <property name="destination" ref="QueueDestination" />
        <property name="messageListener" ref="testMessageListener" />
        <property name="concurrentConsumers" value="5" />
        <property name="acceptMessagesWhileStopping" value="false" />
        <property name="recoveryInterval" value="10000" />
        <property name="cacheLevelName" value="CACHE_CONSUMER" /> 
    </bean>

</beans>

JUnit 测试:

@ContextConfiguration(locations = "classpath:/activeMQ-context.xml")
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests 

    @Autowired
    private JmsTemplate template;

    @Autowired
    private ActiveMQDestination destination;

    @Test
    public void testJMSFactory() 
        /* sending a message */
        template.convertAndSend(destination, "Hi");

        /* receiving a message */
        Object msg = template.receive(destination);
        if (msg instanceof TextMessage) 
            try 
                System.out.println(((TextMessage) msg).getText());
             catch (JMSException e) 
                System.out.println("Error : " + e.getMessage());
            
        
    

要添加到 pom.xml 的依赖项:

<!-- Spring -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>$org.springframework-version</version>
</dependency>

<!-- ActiveMQ -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.6.0</version>
    <scope>compile</scope>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.6.0</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.6.0</version>
</dependency>

【讨论】:

我们在 ActiveMQ 中看到的问题(以及为什么我目前正在快速搜索替代方案): (a) 当您关闭它时,它不会停止其所有线程。 (b) 它的许多 API 在构建时自动启动服务,同时还具有类似 start() 的方法,这使得 API 不清楚并可能有助于 (a)。 (c) 多线程的许多问题,迄今为止,这些问题已通过他们添加更多同步块来“修复”。 (d) 可疑的安全性,尤其是在发现模式下使用时。 (专业提示:设置一个流氓消息队列服务器,在办公室度过愉快的时光!) 你知道线程关闭问题是否已经解决了吗?你还在用这个解决方案吗?【参考方案2】:

WebSphere MQ 客户端能够执行multicast pub/sub。这提供了绕过队列管理器的客户端到客户端功能,尽管需要队列管理器来建立连接。

【讨论】:

以上是关于是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?的主要内容,如果未能解决你的问题,请参考以下文章

活动 MQ 嵌入式代理;在生产中实用且可靠?

在 Spring Java 应用程序中嵌入 openfire

MongoDB嵌入Java

带有服务器端输入的Java Linux守护进程/服务

消息队列(mq)是啥?

通过JAVA从MQ读取消息的时候报错及解决