activemq入门实例

Posted 哎喔别走

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq入门实例相关的知识,希望对你有一定的参考价值。

  首先了解下jms。

  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

JMS对象模型包含如下几个要素:

1.ConnectionFactory 创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。

2. Destination Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。又称为消息队列,是实际的消息源

3. Connection Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

4. Session (Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。ession是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。同样,也分QueueSession和TopicSession。

5. 生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。消息生产者分两种类型:QueueSender和TopicPublisher。消息消费者也分为两种类型:QueueReceiver和TopicSubscriber。

 

下面展示一个activemq的入门例子:

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Message;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsOwnTest {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
Thread.sleep(2000);
thread(new HelloWorldConsumer(), false);
}

public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}

public static class HelloWorldProducer implements Runnable {
public void run() {
try {
// 创建一个连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();

//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建一个名称为firstQueue的消息队列
Destination destination = session.createQueue("firstQueue");

// 通过session创建指定队列的消息生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 消息内容
String text = "我来了! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);

// 发送消息
System.out.println("发送消息内容: 【" + text + "】");
producer.send(message);

session.close();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}

public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {

// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// 创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();

connection.setExceptionListener(this);

//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建一个名称为firstQueue的消息队列
Destination destination = session.createQueue("firstQueue");

//通过session创建指定队列的消息消费者
MessageConsumer consumer = session.createConsumer(destination);

Message message = consumer.receive(1000);

if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("收到的消息: 【" + text + "】");
} else {
System.out.println("收到的消息: 【" + message + "】");
}

consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public synchronized void onException(JMSException ex) {
System.out.println("JMS异常.");
}
}
}

以上是关于activemq入门实例的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ入门系列二:入门代码实例(点对点模式)

ActiveMQ入门实例

ActiveMQ入门实例

ActiveMQ入门实例

ActiveMQ入门实例(转)

ActiveMQ 入门使用实例