ACTIVEMQ-发布者订阅者hello world示例
Posted
技术标签:
【中文标题】ACTIVEMQ-发布者订阅者hello world示例【英文标题】:ACTIVEMQ- publisher subscriber hello world example 【发布时间】:2012-02-19 09:56:46 【问题描述】:有两个程序:订阅者和发布者... 订阅者能够将消息放到主题上并且消息发送成功。 当我在浏览器上检查 activemq 服务器时,它显示 1 msg enqueued 。但是当我运行消费者代码时,它没有收到消息
这里是生产者代码:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class producer
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testt");
MessageProducer producer = session.createProducer(topic);
// We will send a small text message saying 'Hello'
TextMessage message = session.createTextMessage();
message.setText("HELLO JMS WORLD");
// Here we are sending the message!
producer.send(message);
System.out.println("Sent message '" + message.getText() + "'");
connection.close();
运行此代码后,控制台的输出为:
26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Sent message 'HELLO JMS WORLD'
这里是消费者代码:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class consumer
// URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the topic from which we will receive messages from = " testt"
public static void main(String[] args) throws JMSException
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("testt");
MessageConsumer consumer = session.createConsumer(topic);
MessageListener listner = new MessageListener()
public void onMessage(Message message)
try
if (message instanceof TextMessage)
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
catch (JMSException e)
System.out.println("Caught:" + e);
e.printStackTrace();
;
consumer.setMessageListener(listner);
connection.close();
运行此代码后,它不显示任何内容。 有人可以帮我解决这个问题吗?
【问题讨论】:
猜你关闭连接太快了。在你的消费者开始消费之前,连接关闭,main方法结束! 【参考方案1】:您的问题是您的消费者正在运行,然后立即关闭。
尝试将其添加到您的消费者中:
consumer.setMessageListener(listner);
try
System.in.read();
catch (IOException e)
e.printStackTrace();
connection.close();
这将等到您按下一个键才停止。
其他需要考虑的事项:
使用 finally 块结束 Java 命名约定鼓励类的首字母使用大写【讨论】:
不,先生,您提到的上述代码也不起作用:(请帮助...严重卡在项目中!【参考方案2】:主要问题(除了应用程序关闭速度很快)是您发送到一个主题。主题不保留消息,因此如果您运行生成然后运行消费者的应用程序,消费者将不会收到任何内容,因为在发送消息时它没有订阅主题。如果您修复关闭问题,然后在一个终端中运行消费者,然后运行生产者,您应该会看到消费者收到的消息。如果您想要保留消息,那么您需要使用一个队列来保留消息,直到有人使用它。
【讨论】:
对不起朋友,这个错误得到了修复。我先执行发布者模块,然后是订阅者…但是它应该是订阅者模块,然后是发布者…谢谢你的建议:)【参考方案3】:只是一些:
使用队列而不是主题。当没有可用的消费者时,主题中的消息将被丢弃,它们不是持久化的。 在设置消息监听器后添加connection.start()。您应该在正确设置所有消费者/生产者后开始连接。 请稍等片刻,然后再次关闭连接。主题可能是你失败的最重要来源。
【讨论】:
【参考方案4】:您的 producer 类是正确的。运行流畅。
但是,您的消费者不正确,您必须对其进行修改。
首先,在创建connection对象后添加setClientID("any_string_value");
例如:Connection connection = connectionFactory.createConnection();
// need to setClientID value, any string value you wish
connection.setClientID("12345");
其次,使用 createDurableSubscriber() 方法而不是 createConsumer() 方法通过主题传输消息。
MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");
这是修改后的comsumer类:
package mq.test;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class consumer
// URL of the JMS server
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the topic from which we will receive messages from = " testt"
public static void main(String[] args) throws JMSException
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
// need to setClientID value, any string value you wish
connection.setClientID("12345");
try
connection.start();
catch(Exception e)
System.err.println("NOT CONNECTED!!!");
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("test_data");
//need to use createDurableSubscriber() method instead of createConsumer() for topic
// MessageConsumer consumer = session.createConsumer(topic);
MessageConsumer consumer = session.createDurableSubscriber(topic,
"SUB1234");
MessageListener listner = new MessageListener()
public void onMessage(Message message)
try
if (message instanceof TextMessage)
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message"
+ textMessage.getText() + "'");
catch (JMSException e)
System.out.println("Caught:" + e);
e.printStackTrace();
;
consumer.setMessageListener(listner);
//connection.close();
现在,您的代码将成功运行。
【讨论】:
以上是关于ACTIVEMQ-发布者订阅者hello world示例的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅