WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
Posted 菠萝蚊鸭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费相关的知识,希望对你有一定的参考价值。
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。
JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-<version>.jar
中,org.apache.axis2.transport.jms.JMSListene
r 和org.apache.axis2.transport.jms.JMSSender
类分别充当传输接收器和发送器。
JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他,例如 Apache Qpid 和 Tibco。
一、在 ESB 中配置 ActiveMQ
1、复制库文件
①、ESB 配置
将以下库文件从 <AMQ_HOME>/lib
目录复制到 <ESB_HOME>/repository/components/lib
目录。
ActiveMQ 5.8.0 及以上
- activemq-broker-5.8.0.jar
- activemq-client-5.8.0.jar
- activemq-kahadb-store-5.8.0.jar
- geronimo-jms_1.1_spec-1.1.1.jar
- geronimo-j2ee-management_1.1_spec-1.0.1.jar
- geronimo-jta_1.0.1B_spec-1.0.1.jar
- hawtbuf-1.9.jar
- Slf4j-api-1.6.6.jar
- activeio-core-3.1.4.jar(在
<AMQ_HOME>/lib/optional
文件夹中)
低版本的 ActiveMQ
- activemq-core-5.5.1.jar
- geronimo-j2ee-management_1.0_spec-1.0.jar
- geronimo-jms_1.1_spec-1.1.1.jar
ActiveMQ 应该在启动 ESB 之前启动并运行
②、ActiveMQ 配置
修改<AMQ_HOME>/conf/jetty.xml
配置文件,不修改的话除本机外无法访问管理控制台,找到 jettyPortbean,将 host 属性从原来的 127.0.0.1 改为 0.0.0.0,修改完成后保存。
2、在 ESB 中配置 JMS 传输侦听器和发送器
①、设置 JMS 侦听器
要启用 JMS 传输侦听器,请取消注释 <ESB_HOME>/repository/conf/axis2/axis2.xml
文件中与 ActiveMQ 相关的以下侦听器配置。
<!--Uncomment this and configure as appropriate for JMS transport support,after setting up your JMS environment (e.g. ActiveMQ)-->
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
<parameter name="myTopicConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
</parameter>
<parameter name="myQueueConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
<parameter name="default" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
</transportReceiver>
②、设置 JMS 发送器
要启用 JMS 传输发送器,请取消注释 <ESB_HOME>/repository/conf/axis2/axis2.xml
文件中的以下配置。
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url
的位置添加以下配置:failover:tcp://localhost:61616
这只会确保发生重新连接。
二、向 JMS 队列发送消息
HTTP 客户端向 ESB 代理服务发送请求,代理服务会把 HTTP 客户端的请求体转发到 ActiveMQ 中指定的队列。(注意,该 HTTP 请求是没有响应消息的)
1、创建消息中介构件
2、REST API 配置文件
在 inSequence 中,该 OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
<?xml version="1.0" encoding="UTF-8"?>
<api context="/JMS_Queue" name="JMS_Queue" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST GET">
<inSequence>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<send>
<endpoint>
<address uri="jms:/ordersQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.131.128:61616&transport.jms.DestinationType=queue"/>
</endpoint>
</send>
</inSequence>
<outSequence>
<send/>
</outSequence>
<faultSequence/>
</resource>
</api>
3、AddressEndpoint 配置
URI:jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=queue
4、测试配置
使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!
在 ActiveMQ 管理控制台可以看到待消费的消息
消费消息
三、监听 JMS 队列中的消息
ESB 在 ActiveMQ 中创建一个和代理服务同名的队列,同时监听该队列中的消息,如果有生产者向该队列生产消息,ESB 会把队列中的消息转发到代理服务指定的 URL。
1、创建消息中介构件
2、代理服务配置文件
OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。
此示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。
在此示例配置, ActiveMQ生产的消息将被发送到后端服务,ESB 不会等待服务的响应。
发布代理服务后,ESB 会在 ActiveMQ 中创建一个和代理服务同名的队列
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="JMSConsumerProxy" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<endpoint>
<address uri="http://www.calvinchan.wso2.org:8280/testAPI"/>
</endpoint>
<inSequence>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
3、测试配置
ActiveMQ 控制台可以看到被监听的队列
四、示例程序
1、消息生产者
库文件 <AMQ_HOME>/activemq-all-x.x.x.jar
package queue.producer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer
private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String QUEUE_NAME = "PublishSubscribe";
public static void main(String[] args) throws Exception
// 连接工厂
// 使用默认用户名、密码、路径
// 因为:底层实现:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" +
// DEFAULT_BROKER_PORT;
// 所以:路径 tcp://host:61616
// 1 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultURL);
connectionFactory.setUserName(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 2 创建连接
Connection connection = connectionFactory.createConnection();
// 3 打开连接
connection.start();
// 4 创建会话
// 第一个参数:是否开启事务
// 第二个参数:消息是否自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建生产者
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++)
// 6 创建消息
Message message = session.createTextMessage("\\r\\n"
+ " \\"TEST\\": \\"发布订阅消息\\"\\r\\n"
+ "");
producer.send(message);
Thread.sleep(2000);
// 8 关闭消息
//session.commit();
producer.close();
session.close();
connection.close();
System.out.println("消息生产成功");
2、消息消费者
①、Consumer
package queue.consumer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer
private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String QUEUE_NAME = "PublishSubscribe";
public static void main(String[] args) throws Exception
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultURL);
connectionFactory.setUserName(USER_NAME);
connectionFactory.setPassword(PASSWORD);
// 创建连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 创建会话
/**
* 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit();
* 如果设置false,操作消息队列后,不使用session.commit();
*/
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer consumer = session.createConsumer(queue);
try
consumer.setMessageListener(new MessageListenerCallBack(session));
catch (Exception e)
// TODO: handle exception
// 关闭连接
session.close();
connection.close();
System.out.println("消费结束0");
e.printStackTrace();
②、MessageListenerCallBack
package queue.consumer;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.*;
public class MessageListenerCallBack implements MessageListener
private Session session;
private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public MessageListenerCallBack(Session session)
this.session = session;
@Override
public void onMessage(Message message)
// TODO 自动生成的方法存根
TextMessage textMessage = (TextMessage) message;
try
if (textMessage != null)
// 接收到消息
System.out.println("["+ simpleDateFormat.format(new Date()) + "] 接收到消息:" + textMessage.getText());
message.acknowledge();
else
//没有接收到消息,通知producer重新发送
this.session.recover();
catch (JMSException e)
// TODO 自动生成的 catch 块
e.printStackTrace();
以上是关于WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅