php 监听activemq 消息队列,需要把监听程序的php文件一直打开么?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了php 监听activemq 消息队列,需要把监听程序的php文件一直打开么?相关的知识,希望对你有一定的参考价值。
监听(消费者)文件一直打开会不会有问题,能不能将监听文件添加到系统进程里呢?
需要起一个常驻进程来运行这个php文件,注意将最大运行时间设置为无限(set_time_limit(0))及日志写入,方便出现问题排查。理论上来说,PHP不适合做常驻进程。 参考技术A window下你可以把写个bat文件执行你的php文件。然后加入系统计划或者系统服务。linux下写个定时任务就行了,如何从 Web 应用程序监听消息队列? (Tomcat,ActiveMQ)
【中文标题】如何从 Web 应用程序监听消息队列? (Tomcat,ActiveMQ)【英文标题】:How to listen to a message queue from a web application? (Tomcat, ActiveMQ) 【发布时间】:2015-10-18 14:10:25 【问题描述】:我很高兴改进我在 Apache Tomcat 上运行的 Web 应用程序。添加了一个ActiveMQ JMS 服务器来发送和接收消息。
我已经可以发送和接收消息,但需要接收方的帮助。
我的网络应用应该如何持续监听一个队列来接收消息?
新消息到达,服务器应该对它们采取行动。例如:将数据添加到数据库或发送回消息。
我已经可以发送消息了。这是代码。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("clientQueue");
MessageProducer publisher = session.createProducer(queue);
connection.start();
Message message = null;
message = session.createTextMessage("Text Message");
publisher.send(message);
我已经可以在请求后收到一条消息(点击;-))
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("serverQueue");
consumer = session.createConsumer(destination);
while (true)
Message message = consumer.receive(300000);
//Do message stuff
我应该如何让 web 应用不断地监听队列? 建议的方式是什么?
热烈感谢所有帮助。谢谢。
编辑 - 解决方案
来自DaveH的建议的当前工作解决方案
我添加了一个 ServletContextListener 来持续收听我的消息。
web.xml
<listener>
<listener-class>com.test.JMSContextListener</listener-class>
</listener>
听众:
public class JMSContextListener implements ServletContextListener
@Override
public void contextInitialized(ServletContextEvent arg0)
Thread thread = new Thread(new JMSConnector());
thread.start();
@Override
public void contextDestroyed(ServletContextEvent arg0)
//Nothing
连接:
public class JMSConnector implements Runnable
public void run()
try
Context context = new InitialContext();
QueueConnectionFactory factory = (QueueConnectionFactory) context.lookup("java:comp/env/jms/ConnectionFactory");
Connection connection = factory.createConnection();
Queue queue = (javax.jms.Queue) context.lookup("java:comp/env/jms/serverQueue");
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
//This MessageListener will do stuff with the message
MessageListenerImpl messageListener = new MessageListenerImpl();
consumer.setMessageListener(messageListener);
connection.start();
// Start connection or nothing will happen!!!
connection.start();
catch (JMSException ex)
//TODO
catch (NamingException ex)
//TODO
这是一个建议的方式还是应该改进?
热烈感谢所有帮助。谢谢。
【问题讨论】:
仅供参考:我没有使用 Spring... 对于Servlet容器3.x,可以用@WebListener注解监听器,无需在web.xml中声明。 (Mkyong) 【参考方案1】:在 Tomcat 中配置 JMS 队列: https://martinsdeveloperworld.wordpress.com/2013/03/03/apache-activemq-and-tomcat/
把activemq-all-5.xx.jar放到$TOMCAT_HOME/lib
在线程中循环监听,从@WebListener
开始:
contextDestroyed 时停止
@WebListener
@Slf4j
public class JmsMailListener implements ServletContextListener
private Thread listenerThread = null;
private QueueConnection connection;
@Override
public void contextInitialized(ServletContextEvent sce)
try
InitialContext initCtx = new InitialContext();
ActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) initCtx.lookup("java:comp/env/jms/ConnectionFactory");
connectionFactory.setTrustAllPackages(true);
connection = connectionFactory.createQueueConnection();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) initCtx.lookup("java:comp/env/jms/queue/MailQueue");
QueueReceiver receiver = queueSession.createReceiver(queue);
connection.start();
log.info("Listen JMS messages ...");
listenerThread = new Thread(() ->
try
while (!Thread.interrupted())
Message m = receiver.receive();
if (m instanceof ObjectMessage)
ObjectMessage om = (ObjectMessage) m;
MyObject myObject = (MyObject) om.getObject();
log.info("Received MyObject ", myObject);
...
catch (Exception e)
log.error("Receiving messages failed: " + e.getMessage(), e);
);
listenerThread.start();
catch (Exception e)
log.error("JMS failed: " + e.getMessage(), e);
@Override
public void contextDestroyed(ServletContextEvent sce)
if (connection != null)
try
connection.close();
catch (JMSException ex)
log.warn("Couldn't close JMSConnection: ", ex);
if (listenerThread != null)
listenerThread.interrupt();
【讨论】:
【参考方案2】:如果您的代码已经可以使用队列中的消息(看起来确实如此),那么我认为您的问题归结为如何让那段代码运行。
看来您没有使用任何框架,所以我认为我将采用可以从队列中检索消息并在应用程序服务器的单独线程中运行的代码。让该线程在应用程序服务器启动时启动,并在应用程序服务器关闭时自行整理。
在应用服务器启动时启动线程的最简单方法是引入一个 ServletContextListener(示例 here。)在 Context Listener 中,在单独的线程中启动您的队列侦听代码。
编辑:我使用了这个提议的解决方案并将上面的代码添加到问题中。
【讨论】:
这看起来很有希望。当我“点击”一个按钮时,我可以收到消息。让我试试这个方法,然后带着解决方案回到论坛。 你的代码不是我的意思。您需要在单独的线程中启动您的侦听器 - 我认为您所拥有的内容会导致应用服务器挂起。我建议让您的 JMSConnector 实现 Runnable 接口并在与您的 ServletContextListener 分开的线程中启动它 添加了一个线程,这似乎有效。如果您有更多建议,那么非常欢迎! 看起来更好。您可能还想在侦听器中实现 Context 销毁方法,以便您可以整齐地关闭队列。 不单独线程启动监听器怎么会导致应用服务器挂掉?【参考方案3】:我在我的 webApplication spring-mvc 和 JMS 模板中使用 activeMQ,方法如下。
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="messageSender" class="xyz.MessageSender"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destination" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="REQUEST_QUEUE" />
</bean>
<bean id="messageListener" class="xtz.MessageListener">
<property name="listenerId" value="1" />
</bean>
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="RESPONSE_QUEUE" ref="messageListener" method="messageReceived" />
</jms:listener-container>
Sender 和 Listener 类实现如下。
public class MessageSender
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage()
jmsTemplate.send(new MessageCreator()
public Message createMessage(Session session) throws JMSException
MapMessage message = session.createMapMessage();
message.setString("messageType", XXX);
message.setString("jsonMessage", XXXX);
return message;
);
public class MessageListener
private int listenerId;
@Override
public void messageReceived(Map<String, Object> message) throws Exception
//put your logic here
public int getListenerId()
return listenerId;
public void setListenerId(int listenerId)
this.listenerId = listenerId;
【讨论】:
【参考方案4】:我正在使用 spring 来监听队列。 定义监听器是这样的:
<jms:listener-container connection-factory="jmsConnectionFactoryLocal">
<jms:listener destination="QUEUE_NAME" ref="channelManagerSimulatorDefault"/>
</jms:listener-container>
jmsConnectionFactoryLocal 必须根据您的 MQ 创建。在我的例子中,它是 IBM WebsphereMQ,所以 jmsConnectionFactoryLocal 的定义是这样的:
<bean id="mqConnectionFactoryLocal" class="com.ibm.mq.jms.MQQueueConnectionFactory">
<property name="hostName">
<value>THE_MQ_SERVER_IP</value>
</property>
<property name="port">
<value>MQ_PORT</value>
</property>
<property name="queueManager">
<value>QUEUE_MANAGER_NAME</value>
</property>
<property name="transportType">
<value>1</value>
</property>
</bean>
<bean id="jmsConnectionFactoryLocal" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="mqConnectionFactoryLocal"/>
<property name="username" value="USER_NAME"/>
<property name="password" value="PASSWORD"/>
</bean>
您必须为 ActiveMQ 找到正确的 ConnectionFactory 实现并使用它。 listener 和 jmsConnectionFactory 是相同的,并且独立于 MQ 提供者。
【讨论】:
以上是关于php 监听activemq 消息队列,需要把监听程序的php文件一直打开么?的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ 消息消费者不主动监听消息队列是不是有消息,只监听是是不是有消息进去消息队列