是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?
Posted
技术标签:
【中文标题】是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?【英文标题】:Are there any MQ servers that can run embedded in a Java process? 【发布时间】:2012-12-29 19:53:21 【问题描述】:我正在为我团队的一个应用研究排队解决方案。理想情况下,我们希望可以配置为轻量级的进程内代理(用于线程之间的低吞吐量消息传递)和外部代理。是否有可以执行此操作的 MQ 服务器?大多数似乎需要设置为外部实体。 ZeroMQ 似乎最接近进程内解决方案,但它似乎更像是“类固醇上的 UDP 套接字”,我们需要可靠的交付。
【问题讨论】:
我认为***.com/questions/2507536/lightweight-jms-broker 的答案包含有趣的信息(例如ffmq 建议)。 ActiveMQ 是另一个较重的候选者,但它也是可嵌入的。 就像@fvu 所说,ActiveMQ 比 ZeroMQ 重一些,但它作为一个嵌入式进程运行得非常好。如果您使用的是 Spring,那么设置起来非常容易。 ZeroMQ 在提供可靠传输的 TCP(而非 UDP)之上运行。但是,您指的是持久队列吗? IE。备份到光盘? 我们不关心消息本身的持久化,但我们确实希望传递尽可能可靠。性能不是问题(还)。我假设这意味着队列应该是持久的。 @fvu 感谢您的输入,我将使用 ActiveMQ。您能否重新发布作为答案,以便我接受? 【参考方案1】:就像我们说的ActiveMQ
比ZeroMQ
重一点,但它作为嵌入式进程运行得非常好。
这里有一个简单的例子,Spring
和 ActiveMQ
。
将用于测试队列的消息侦听器:
public class TestMessageListener implements MessageListener
private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class);
@Override
public void onMessage(Message message)
/* Receive the text message */
if (message instanceof TextMessage)
try
String text = ((TextMessage) message).getText();
System.out.println("Message reception from the JMS queue : " + text);
catch (JMSException e)
logger.error("Error : " + e.getMessage());
else
/* Handle non text message */
ActiveMQ
上下文配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61617</value>
</property>
</bean>
<bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="jmsQueueConnectionFactory" />
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="messageQueue" />
</bean>
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="pooledJmsQueueConnectionFactory" />
<property name="pubSubDomain" value="false"/>
</bean>
<bean id="testMessageListener" class="com.example.jms.TestMessageListener" />
<bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" />
<property name="destination" ref="QueueDestination" />
<property name="messageListener" ref="testMessageListener" />
<property name="concurrentConsumers" value="5" />
<property name="acceptMessagesWhileStopping" value="false" />
<property name="recoveryInterval" value="10000" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>
</beans>
JUnit
测试:
@ContextConfiguration(locations = "classpath:/activeMQ-context.xml")
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests
@Autowired
private JmsTemplate template;
@Autowired
private ActiveMQDestination destination;
@Test
public void testJMSFactory()
/* sending a message */
template.convertAndSend(destination, "Hi");
/* receiving a message */
Object msg = template.receive(destination);
if (msg instanceof TextMessage)
try
System.out.println(((TextMessage) msg).getText());
catch (JMSException e)
System.out.println("Error : " + e.getMessage());
要添加到 pom.xml
的依赖项:
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>$org.springframework-version</version>
</dependency>
<!-- ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.6.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.6.0</version>
</dependency>
【讨论】:
我们在 ActiveMQ 中看到的问题(以及为什么我目前正在快速搜索替代方案): (a) 当您关闭它时,它不会停止其所有线程。 (b) 它的许多 API 在构建时自动启动服务,同时还具有类似 start() 的方法,这使得 API 不清楚并可能有助于 (a)。 (c) 多线程的许多问题,迄今为止,这些问题已通过他们添加更多同步块来“修复”。 (d) 可疑的安全性,尤其是在发现模式下使用时。 (专业提示:设置一个流氓消息队列服务器,在办公室度过愉快的时光!) 你知道线程关闭问题是否已经解决了吗?你还在用这个解决方案吗?【参考方案2】:WebSphere MQ 客户端能够执行multicast pub/sub。这提供了绕过队列管理器的客户端到客户端功能,尽管需要队列管理器来建立连接。
【讨论】:
以上是关于是否有任何 MQ 服务器可以嵌入在 Java 进程中运行?的主要内容,如果未能解决你的问题,请参考以下文章