ActiveMq整理之java应用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMq整理之java应用相关的知识,希望对你有一定的参考价值。
一、JMS
更多介绍参考
“http://baike.baidu.com/link?url=LNCEOGgqEX-uSKuRJooyG1RSfS7CTWDKYT8OOouhxLk_yWNN-0wNSWq7KjNQ259a9pfL95janJi8v8-drvdHqa”
1、1背景
当前,CORBA、DCOM、RMI等RPC中间件技术已广泛应用于各个领域。但是面对规模和复杂度越来越高的分布式系统,这些技术也显示出其局限性:
1、同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果才能继续执行;
2、客户和服务对象的生命周期紧密耦合:客户进程和服务对象进行都必须正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常。
3、点对点通信:客户的一次调用只发送给某个单独的目标对象。
面向消息的中间件(MessageOriented Middleward,MOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再把消息转发给接收者。这种模式下,发送和接收是异步的,发送者无需等待,二者的生命周期也未必相同,发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行,一对多通信,对于一个消息可以有多个接收者。
1、2简介
JMS即java消息服务(javamessage service)应用程序接口,是一个java平台中关于面向消息中间件(MOM)的API,用在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的API,用来访问消息收发系统消息,它类似于JDBC(java database connectitity)。这里,JDBC是可以用来访问许多不同关系数据库的API,而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都提供JMS,包括IBM的MQSeries、BEA的 Weblogic JMS service、 Progress 的 SonicMQ、微软的MSMQ。JMS使使用者可以通过消息收发服务(有时称为消息中介服务或路由器)从一个JMS客户机向另一个JMS客户机发送消息。消息是JMS中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本、可序列化的对象、属性集合、字节流、原始值流,还有无有效负载的消息。
1、3JMS体系架构
JMS由以下元素组成:
JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户
生产或消费基于消息的Java的应用程序或对象。
JMS生产者
创建并发送消息的JMS客户。
JMS消费者
接收消息的JMS客户。
JMS消息
包括可以在JMS客户之间传递的数据的对象
JMS队列
一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题
一种支持发送消息给多个订阅者的机制。
1、4JMS对象模型
1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。生产者(MessageProducer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型:
①点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
②发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
1、5模型
Java消息服务应用程序结构支持两种模型:
点对点或队列模型
发布者/订阅者模型
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:
只有一个消费者将获得消息。
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收。
发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:
多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
使用Java语言,JMS提供了将应用与提供数据的传输层相分离的方式。同一组Java类可以通过JNDI中关于提供者的信息,连接不同的JMS提供者。这一组类首先使用一个连接工厂以连接到队列或主题,然后发送或发布消息。在接收端,客户接收或订阅这些消息。
1、6传递方式
JMS有两种传递消息的方式。标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。如果一个JMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。所以默认的消息传递方式是非持久性的。即使使用非持久性消息可能降低内务和需要的存储器,并且这种传递方式只有当你不需要接收所有的消息时才使用。
虽然JMS规范并不需要JMS供应商实现消息的优先级路线,但是它需要递送加快的消息优先于普通级别的消息。JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。举例来说: topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8,10000); //Pub-Sub 或 queueSender.send(message,DeliveryMode.PERSISTENT, 8, 10000);//P2P 这个代码片断,有两种消息模型,映射递送方式是持久的,优先级为加快型,生存周期是10000 (以毫秒度量)。如果生存周期设置为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设置生存周期是有用的。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个未解释字节的数据流
1、7应用程序
ConnectionFactory接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。
MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
消息接口非常灵活,并提供了许多方式来定制消息的内容。
二、ActiveMq
2、1简介
2、2操作
2、3java代码操作
2、3、1代码创建顺序
第一步,建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认即可,默认端口为”tcp://localhost:61616”。
第二步,通过ConnectionFactory工厂对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。
第三步,通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用事物,参数配置2为签收模式,一般设置为自动签收。
第四步,通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue,即队列;在PUB/SUB模式,Destination被称作Topic,即主题,在程序中可使用多个Queue和Topic。
第五步,需要通过session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer。
第六步,在生产端,可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)。
第七步,使用JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行接收数据。最后不要忘记关闭connection连接。
2、3、2send端
package org.mbox.test01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Send { public staticvoid main(String[] args) throws JMSException { ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); Connectionconnection = connectionFactory.createConnection(); connection.start(); Sessionsession = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destinationdesination = session.createQueue("queue1"); MessageProducermessageProducer = session.createProducer(desination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i =0; i < 5; i++) { TextMessagetextMessage = session.createTextMessage(); textMessage.setText("发送消息内容,第"+i+"条。"); messageProducer.send(textMessage); } if(connection!= null){ connection.close(); } } }
2、3、3Receive
package org.mbox.test01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive { public staticvoid main(String[] args) throws JMSException { ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616"); Connectionconnection = connectionFactory.createConnection(); connection.start(); Sessionsession = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destinationdestination = session.createQueue("queue1"); MessageConsumermessageConsumer = session.createConsumer(destination); while(true){ //此种receive是阻塞的,直到接收到消息才往下执行 TextMessagemsg = (TextMessage) messageConsumer.receive(); if(msg== null){ break; } System.out.println("接收到的内容:"+msg.getText()); } if(connection!= null){ connection.close(); } } }
再点queue1就是空的了。
再次运行消费端,就等于是两个消费者了。
2、4activemq的安全机制
设置安全机制,只有符合认证的用户才能进行发送和获取消息,需在activemq.xml里添加安全验证配置!
改造新加内容为
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUserusername="lly" password="lly"groups="users,admins"/> <authenticationUserusername="test" password="test"groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> 重启activemq服务,并且修改发送端和接收端的代码分别为,用户名可以不一致: ConnectionFactory connectionFactory = newActiveMQConnectionFactory( "lly", "lly", "tcp://localhost:61616"); ConnectionFactory connectionFactory = newActiveMQConnectionFactory( "test", "test", "tcp://localhost:61616");
2、5connection方法使用
在成功创建正确的connectionFactory之后,下一步是创建一个连接,它是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,connection的目的是“利用JMS提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放TCP/IP套接字”。该文档还指出connection应该是进行客户端身份验证的地方等等。
当一个connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个connection可以建立一个或多个session。
当一个程序执行完成后,必须关闭之前创建的connection,否则activemq不能释放资源,关闭一个connection同样也关闭了session、MessageProducer和MessageCustomer。
2、6session方法的使用
一旦从ConnectionFactory中获得一个connection,必须从connection中创建一个或多个session。Session是一个发送或接收消息的线程,可以使用session创建MessageProducer、MessageCustomer和Message。
Session可以被事物化,也可以不被事物化,通常,可以通过向connection上的适当方法传递一个布尔参数对此进行设置。
SessioncreateSession(boolean transacted, int acknowledgeMode);
其中transacted是使用事物标识,acknowledgeMode为签收模式。
结束事物有两种方法:提交或者回滚。当一个事物提交,消息被处理。如果事物中有一个步骤失败,事物就回滚,这个事物中的已经执行的动作将被撤销。在发送消息最后也必须要使用session.commit()方法表示提交事物。
签收模式有三种形式:
Session.AUTO_ACKNOWLEDGE,当客户端从receive或onmessage成功返回时,session自动签收客户端的这条信息的收条。
Session.CLIENT_ACKNOWLEDGE,客户端通过调用消息(Message)的acknowledgeMode方法接收消息。在这种情况下,签收发生在session层面:签收一个已消费的消息会自动的签收这个session所有已消费消息的收条。
Session.DUPS_OK_ACKNOWLEDGE,指示session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了session的开销,所以只有客户端能容忍重复的消息,才可使用。
对上述三种形式的理解:
Session.AUTO_ACKNOWLEDGE:procedure产生了一个消息发送给MQ制定队列queue1中,customer去MQ取消息,连接上以后,如果MQ有数据,通过TCP把数据返回给customer,一旦customer接收到以后,会自动再次向MQ发送确认消息(自动签收机制,不用写代码给其确认收到消息),但是这条确认收到的信息是看不到的。
Session.CLIENT_ACKNOWLEDGE:customer接收到MQ消息以后,手工调用knowledge方法给MQ发送确认消息,customer已经接收到消息了。
Session.DUPS_OK_ACKNOWLEDGE:没有签收机制,可能会有消息重复,但是降低了session的开销。
2、6、1测试开启session和AUTO_ACKNOWLEDGE
2、6、2测试开启session和CLIENT_ACKNOWLEDGE(推荐)
2、6、3测试开启session和DUPS_OK_ACKNOWLEDGE
2、7MessageProducer
2、7、1测试优先级第一部分(不开启事物、不持久化)
2、7、2测试优先级第一部分(开启事物、持久化)
2、7、3测试优先级第一部分:上述二者测试结果
并没有按照优先级获取到消息。
2、7、4测试优先级第二部分
2、8MessageCustomer、Message
2、8、1不设置过滤器
2、8、1、1未处理前的控制器
2、8、1、2生产者
package org.mbox.test03; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageProducer messageProceducer; publicProducer(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProceducer= this.session.createProducer(null); } catch(JMSException e) { e.printStackTrace(); } } public voidsend1(){ try { Destinationdestination = this.session.createQueue("first"); MapMessagemsg1 = this.session.createMapMessage(); //下述的Property属性是针对消费端的过滤器的 msg1.setString("name","张三"); msg1.setString("age","21"); // msg1.setStringProperty("color","blue"); // msg1.setIntProperty("sal",1000); // int id =1; // msg1.setInt("id",id); // Stringreceiver = id%2 == 0 ? "A":"B"; // msg1.setStringProperty("receiver",receiver); MapMessagemsg2 = this.session.createMapMessage(); msg2.setString("name","李四"); msg2.setString("age","22"); // msg2.setStringProperty("color","blue"); // msg2.setIntProperty("sal",1200); // id = 2; // msg2.setInt("id",id); // receiver= id%2 == 0 ? "A":"B"; // msg2.setStringProperty("receiver",receiver); MapMessagemsg3 = this.session.createMapMessage(); msg3.setString("name","王五"); msg3.setString("age","23"); // msg3.setStringProperty("color","blue"); // msg3.setIntProperty("sal",1300); // id = 3; // msg3.setInt("id",id); // receiver= id%2 == 0 ? "A":"B"; // msg3.setStringProperty("receiver",receiver); MapMessagemsg4 = this.session.createMapMessage(); msg4.setString("name","赵六"); msg4.setString("age","24"); // msg4.setStringProperty("color","oragne"); // msg4.setIntProperty("sal",1400); // id = 4; // msg4.setInt("id",id); // receiver= id%2 == 0 ? "A":"B"; // msg4.setStringProperty("receiver",receiver); this.messageProceducer.send(destination,msg1, DeliveryMode.NON_PERSISTENT, 2, 1000*60*10L); this.messageProceducer.send(destination,msg2, DeliveryMode.NON_PERSISTENT, 4, 1000*60*10L); this.messageProceducer.send(destination,msg3, DeliveryMode.NON_PERSISTENT, 6, 1000*60*10L); this.messageProceducer.send(destination,msg4, DeliveryMode.NON_PERSISTENT, 8, 1000*60*10L); } catch (JMSExceptione) { e.printStackTrace(); } } public voidsend2(){ try { Destinationdestination = this.session.createQueue("first"); TextMessagemsg = this.session.createTextMessage("我是一个字符串内容"); this.messageProceducer.send(destination,msg, DeliveryMode.NON_PERSISTENT, 9, 1000*60*10L); } catch(JMSException e) { e.printStackTrace(); } } public staticvoid main(String[] args) { Producerproducer = new Producer(); producer.send1(); } }
2、8、1、3消费者
package org.mbox.test03; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Customer { public finalString SELECTOR_1 = "color = ‘blue‘"; public finalString SELECTOR_2 = "color = ‘blue‘ and sal > 1000"; public finalString SELECTOR_3 = "receiver = ‘A‘ "; privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageConsumer messageConsumer; privateDestination destination; publicCustomer(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination= this.session.createQueue("first"); //不设置过滤器 this.messageConsumer= this.session.createConsumer(destination); //设置过滤器 // this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2); this.receiver(); } catch(JMSException e) { e.printStackTrace(); } } public voidreceiver(){ try { this.messageConsumer.setMessageListener(newListener()); } catch(JMSException e) { e.printStackTrace(); } } class Listenerimplements MessageListener{ //自动调用onMessage方法 @Override public voidonMessage(Message message) { try { //判断数据类型 if(messageinstanceof MapMessage){ MapMessagemsg = (MapMessage)message; System.out.println(msg.toString()); System.out.println(msg.getString("name")); System.out.println(msg.getString("age")); } if(messageinstanceof TextMessage){ } } catch(JMSException e) { e.printStackTrace(); } } } public staticvoid main(String[] args) { newCustomer(); } }
#执行结果
ActiveMQMapMessage {commandId = 5, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:1,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591920, timestamp = 1483464991920,arrival = 0, brokerInTime = 1483464991924, brokerOutTime = 1483465021382, correlationId= null, replyTo = null, persistent = false, type = null, priority = 2, groupID= null, groupSequence = 0, targetConsumerId = null, compressed = false, userID= null, content = [email protected],marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
张三
21
ActiveMQMapMessage {commandId = 6, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:2,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991924, brokerOutTime = 1483465021383,correlationId = null, replyTo = null, persistent = false, type = null, priority= 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = [email protected],marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
李四
22
ActiveMQMapMessage {commandId = 7, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:3,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991925, brokerOutTime = 1483465021384,correlationId = null, replyTo = null, persistent = false, type = null, priority= 6, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = [email protected],marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
王五
23
ActiveMQMapMessage {commandId = 8, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:4,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991925, brokerOutTime = 1483465021384,correlationId = null, replyTo = null, persistent = false, type = null, priority= 8, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = [email protected],marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
赵六
24
2、8、2设置过滤器
2、8、2、1未处理前的控制器
2、8、2、2生产者
package org.mbox.test03; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageProducer messageProceducer; publicProducer(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProceducer= this.session.createProducer(null); } catch(JMSException e) { e.printStackTrace(); } } public voidsend1(){ try { Destinationdestination = this.session.createQueue("first"); MapMessagemsg1 = this.session.createMapMessage(); //下述的Property属性是针对消费端的过滤器的 msg1.setString("name","张三"); msg1.setString("age","21"); msg1.setStringProperty("color","blue"); msg1.setIntProperty("sal",1000); int id =1; msg1.setInt("id",id); Stringreceiver = id%2 == 0 ? "A":"B"; msg1.setStringProperty("receiver",receiver); MapMessagemsg2 = this.session.createMapMessage(); msg2.setString("name","李四"); msg2.setString("age","22"); msg2.setStringProperty("color","blue"); msg2.setIntProperty("sal",1200); id = 2; msg2.setInt("id",id); receiver= id%2 == 0 ? "A":"B"; msg2.setStringProperty("receiver",receiver); MapMessagemsg3 = this.session.createMapMessage(); msg3.setString("name","王五"); msg3.setString("age","23"); msg3.setStringProperty("color","blue"); msg3.setIntProperty("sal",1300); id = 3; msg3.setInt("id",id); receiver= id%2 == 0 ? "A":"B"; msg3.setStringProperty("receiver",receiver); MapMessagemsg4 = this.session.createMapMessage(); msg4.setString("name","赵六"); msg4.setString("age","24"); msg4.setStringProperty("color","oragne"); msg4.setIntProperty("sal",1400); id = 4; msg4.setInt("id",id); receiver= id%2 == 0 ? "A":"B"; msg4.setStringProperty("receiver",receiver); this.messageProceducer.send(destination,msg1, DeliveryMode.NON_PERSISTENT, 2, 1000*60*10L); this.messageProceducer.send(destination,msg2, DeliveryMode.NON_PERSISTENT, 4, 1000*60*10L); this.messageProceducer.send(destination,msg3, DeliveryMode.NON_PERSISTENT, 6, 1000*60*10L); this.messageProceducer.send(destination,msg4, DeliveryMode.NON_PERSISTENT, 8, 1000*60*10L); } catch(JMSException e) { e.printStackTrace(); } } public voidsend2(){ try { Destinationdestination = this.session.createQueue("first"); TextMessagemsg = this.session.createTextMessage("我是一个字符串内容"); this.messageProceducer.send(destination,msg, DeliveryMode.NON_PERSISTENT, 9, 1000*60*10L); } catch(JMSException e) { e.printStackTrace(); } } public staticvoid main(String[] args) { Producerproducer = new Producer(); producer.send1(); } }
2、8、2、3消费者
package org.mbox.test03; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Customer { public finalString SELECTOR_1 = "color = ‘blue‘"; public finalString SELECTOR_2 = "color = ‘blue‘ and sal > 1000"; public finalString SELECTOR_3 = "receiver = ‘A‘ "; privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageConsumer messageConsumer; privateDestination destination; publicCustomer(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination= this.session.createQueue("first"); // //不设置过滤器 // this.messageConsumer= this.session.createConsumer(destination); //设置过滤器,过滤性能不确定,所以推荐用ROCKETMQ this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2); this.receiver(); } catch (JMSExceptione) { e.printStackTrace(); } } public voidreceiver(){ try { this.messageConsumer.setMessageListener(newListener()); } catch(JMSException e) { e.printStackTrace(); } } class Listenerimplements MessageListener{ //自动调用onMessage方法 @Override public voidonMessage(Message message) { try { //判断数据类型 if(messageinstanceof MapMessage){ MapMessagemsg = (MapMessage)message; System.out.println(msg.toString()); System.out.println(msg.getString("name")); System.out.println(msg.getString("age")); } if(messageinstanceof TextMessage){ } } catch(JMSException e) { e.printStackTrace(); } } } public staticvoid main(String[] args) { newCustomer(); } }
#控制台输出结果
ActiveMQMapMessage {commandId = 6, responseRequired =false, messageId = ID:XT-201605212138-56953-1483465211306-1:1:1:1:2,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56953-1483465211306-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465811545, timestamp = 1483465211545,arrival = 0, brokerInTime = 1483465211547, brokerOutTime = 1483465242258,correlationId = null, replyTo = null, persistent = false, type = null, priority= 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = [email protected],marshalledProperties = [email protected],dataStructure = null, redeliveryCounter = 0, size = 0, properties ={receiver=A, color=blue, sal=1200}, readOnlyProperties = true, readOnlyBody =true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
李四
22
ActiveMQMapMessage {commandId = 7, responseRequired =false, messageId = ID:XT-201605212138-56953-1483465211306-1:1:1:1:3,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56953-1483465211306-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465811545, timestamp = 1483465211545,arrival = 0, brokerInTime = 1483465211547, brokerOutTime = 1483465242261,correlationId = null, replyTo = null, persistent = false, type = null, priority= 6, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = [email protected],marshalledProperties = [email protected],dataStructure = null, redeliveryCounter = 0, size = 0, properties = {receiver=B,color=blue, sal=1300}, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }
王五
23
2、9持久化mysql
#运行代码 package org.mbox.test01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Send { public staticvoid main(String[] args) throws JMSException { // ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( // ActiveMQConnectionFactory.DEFAULT_USER, // ActiveMQConnectionFactory.DEFAULT_PASSWORD, // "tcp://localhost:61616"); ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( "lly", "lly", "tcp://localhost:61616"); Connectionconnection = connectionFactory.createConnection(); connection.start(); Sessionsession = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); Destinationdesination = session.createQueue("queue1"); MessageProducermessageProducer = session.createProducer(desination); // messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // for (int i =0; i < 5; i++) { TextMessagetextMessage = session.createTextMessage(); textMessage.setText("发送消息内容,第1条。优先级设置1"); //第一个参数目的地 //第二个参数消息 //第三个参数是否持久化 //第四个参数优先级 //第五个参数消息在MQ上的存放有效期 messageProducer.send(desination,textMessage, DeliveryMode.PERSISTENT, 1 , 1000*60*2); // } TextMessagetextMessage2 = session.createTextMessage(); textMessage2.setText("发送消息内容,第2条。优先级设置9"); messageProducer.send(desination,textMessage2, DeliveryMode.PERSISTENT, 9 , 1000*60*2); TextMessagetextMessage3 = session.createTextMessage(); textMessage3.setText("发送消息内容,第3条。优先级设置6"); messageProducer.send(desination,textMessage3, DeliveryMode.PERSISTENT, 6 , 1000*60*2); TextMessagetextMessage4 = session.createTextMessage(); textMessage4.setText("发送消息内容,第4条。优先级设置8"); messageProducer.send(desination,textMessage4, DeliveryMode.PERSISTENT, 8 , 1000*60*2); TextMessagetextMessage5 = session.createTextMessage(); textMessage5.setText("发送消息内容,第5条。优先级设置3"); messageProducer.send(desination,textMessage5, DeliveryMode.PERSISTENT, 3 , 1000*60*2); session.commit(); if(connection!= null){ connection.close(); } } }
2、11、3例子
2、11、3、1发布端
package org.mbox.test04; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageProducer messageProducer; publicPublish(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProducer= this.session.createProducer(null); } catch(JMSException e) { e.printStackTrace(); } } public voidsendMessage(){ try { Destinationdestination = this.session.createTopic("topic1"); TextMessagetextMessage = this.session.createTextMessage("我是内容"); this.messageProducer.send(destination,textMessage); } catch(JMSException e) { e.printStackTrace(); } } public staticvoid main(String[] args) { Publishpublish = new Publish(); publish.sendMessage(); } }
2、11、3、2消费端
package org.mbox.test03; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Customer { public finalString SELECTOR_1 = "color = ‘blue‘"; public finalString SELECTOR_2 = "color = ‘blue‘ and sal > 1000"; public finalString SELECTOR_3 = "receiver = ‘A‘ "; privateConnectionFactory connectionFactory; privateConnection connection; private Sessionsession; privateMessageConsumer messageConsumer; privateDestination destination; publicCustomer(){ try { this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616"); this.connection= this.connectionFactory.createConnection(); this.connection.start(); this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination= this.session.createQueue("first"); // //不设置过滤器 // this.messageConsumer= this.session.createConsumer(destination); //设置过滤器,过滤性能不确定,所以推荐用ROCKETMQ this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2); this.receiver(); } catch(JMSException e) { e.printStackTrace(); } } public voidreceiver(){ try { this.messageConsumer.setMessageListener(newListener()); } catch(JMSException e) { e.printStackTrace(); } } class Listenerimplements MessageListener{ //自动调用onMessage方法 @Override public void onMessage(Messagemessage) { try { //判断数据类型 if(messageinstanceof MapMessage){ MapMessagemsg = (MapMessage)message; System.out.println(msg.toString()); System.out.println(msg.getString("name")); System.out.println(msg.getString("age")); } if(messageinstanceof TextMessage){ } } catch(JMSException e) { e.printStackTrace(); } } } public staticvoid main(String[] args) { newCustomer(); } }
本文出自 “你可以选择不平凡” 博客,请务必保留此出处http://ylcodes01.blog.51cto.com/5607366/1901430
以上是关于ActiveMq整理之java应用的主要内容,如果未能解决你的问题,请参考以下文章