WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

Posted 菠萝蚊鸭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费相关的知识,希望对你有一定的参考价值。

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

        WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。

        JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-<version>.jar 中,org.apache.axis2.transport.jms.JMSListener 和org.apache.axis2.transport.jms.JMSSender 类分别充当传输接收器和发送器。

        JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他,例如 Apache Qpid 和 Tibco。

一、在 ESB 中配置 ActiveMQ

1、复制库文件

①、ESB 配置

将以下库文件从 <AMQ_HOME>/lib 目录复制到 <ESB_HOME>/repository/components/lib 目录。

ActiveMQ 5.8.0 及以上

  • activemq-broker-5.8.0.jar
  • activemq-client-5.8.0.jar
  • activemq-kahadb-store-5.8.0.jar
  • geronimo-jms_1.1_spec-1.1.1.jar
  • geronimo-j2ee-management_1.1_spec-1.0.1.jar
  • geronimo-jta_1.0.1B_spec-1.0.1.jar
  • hawtbuf-1.9.jar
  • Slf4j-api-1.6.6.jar
  • activeio-core-3.1.4.jar(在 <AMQ_HOME>/lib/optional 文件夹中)

低版本的 ActiveMQ

  • activemq-core-5.5.1.jar
  • geronimo-j2ee-management_1.0_spec-1.0.jar
  • geronimo-jms_1.1_spec-1.1.1.jar

ActiveMQ 应该在启动 ESB 之前启动并运行

②、ActiveMQ 配置

修改<AMQ_HOME>/conf/jetty.xml配置文件,不修改的话除本机外无法访问管理控制台,找到 jettyPortbean,将 host 属性从原来的 127.0.0.1 改为 0.0.0.0,修改完成后保存。

2、在 ESB 中配置 JMS 传输侦听器和发送器

①、设置 JMS 侦听器

要启用 JMS 传输侦听器,请取消注释 <ESB_HOME>/repository/conf/axis2/axis2.xml 文件中与 ActiveMQ 相关的以下侦听器配置。

<!--Uncomment this and configure as appropriate for JMS transport support,after setting up your JMS environment (e.g. ActiveMQ)-->
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
    <parameter name="myTopicConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
    </parameter>
    <parameter name="myQueueConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
    <parameter name="default" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://ActiveMQ服务器地址:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
</transportReceiver>

②、设置 JMS 发送器

        要启用 JMS 传输发送器,请取消注释 <ESB_HOME>/repository/conf/axis2/axis2.xml 文件中的以下配置。
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

        上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url 的位置添加以下配置:failover:tcp://localhost:61616

        这只会确保发生重新连接。

二、向 JMS 队列发送消息

        HTTP 客户端向 ESB 代理服务发送请求,代理服务会把 HTTP 客户端的请求体转发到 ActiveMQ 中指定的队列。(注意,该 HTTP 请求是没有响应消息的)

1、创建消息中介构件


2、REST API 配置文件

        在 inSequence 中,该 OUT_ONLY 属性设置为 true 以指示消息交换是单向的。

<?xml version="1.0" encoding="UTF-8"?>
<api context="/JMS_Queue" name="JMS_Queue" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST GET">
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <send>
                <endpoint>
                    <address uri="jms:/ordersQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://192.168.131.128:61616&amp;transport.jms.DestinationType=queue"/>
                </endpoint>
            </send>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </resource>
</api>

3、AddressEndpoint 配置

URI:jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&amp;transport.jms.DestinationType=queue

4、测试配置

使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!

在 ActiveMQ 管理控制台可以看到待消费的消息

消费消息

三、监听 JMS 队列中的消息

        ESB 在 ActiveMQ 中创建一个和代理服务同名的队列,同时监听该队列中的消息,如果有生产者向该队列生产消息,ESB 会把队列中的消息转发到代理服务指定的 URL。

1、创建消息中介构件


2、代理服务配置文件

OUT_ONLY 属性设置为 true 以指示消息交换是单向的。

将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。

此示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。

在此示例配置, ActiveMQ生产的消息将被发送到后端服务,ESB 不会等待服务的响应。

发布代理服务后,ESB 会在 ActiveMQ 中创建一个和代理服务同名的队列

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="JMSConsumerProxy" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="http://www.calvinchan.wso2.org:8280/testAPI"/>
        </endpoint>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

3、测试配置



ActiveMQ 控制台可以看到被监听的队列

四、示例程序

1、消息生产者

库文件 <AMQ_HOME>/activemq-all-x.x.x.jar

package queue.producer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer 

	private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";

	private static final String QUEUE_NAME = "PublishSubscribe";

	public static void main(String[] args) throws Exception 

		// 连接工厂
		// 使用默认用户名、密码、路径
		// 因为:底层实现:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" +
		// DEFAULT_BROKER_PORT;
		// 所以:路径 tcp://host:61616
		// 1 创建连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		
		connectionFactory.setBrokerURL(defaultURL);
		connectionFactory.setUserName(USER_NAME);
		connectionFactory.setPassword(PASSWORD);
		
		// 2 创建连接
		Connection connection = connectionFactory.createConnection();
		// 3 打开连接
		connection.start();
		// 4 创建会话
		// 第一个参数:是否开启事务
		// 第二个参数:消息是否自动确认
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 创建队列
		Queue queue = session.createQueue(QUEUE_NAME);
		// 5 创建生产者
		MessageProducer producer = session.createProducer(queue);
		
		
		for (int i = 0; i < 10; i++) 
			// 6 创建消息
			Message message = session.createTextMessage("\\r\\n"
					+ "    \\"TEST\\": \\"发布订阅消息\\"\\r\\n"
					+ "");
			producer.send(message);
			
			Thread.sleep(2000);
		
		// 8 关闭消息
		//session.commit();
		producer.close();
		session.close();
		connection.close();
		System.out.println("消息生产成功");
	

2、消息消费者

①、Consumer

package queue.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer 

	private static final String DEFAULT_BROKER_HOST = "ActiveMQ服务器地址";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";
	
	private static final String QUEUE_NAME = "PublishSubscribe";

	public static void main(String[] args) throws Exception 

		// 创建连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		
		connectionFactory.setBrokerURL(defaultURL);
		connectionFactory.setUserName(USER_NAME);
		connectionFactory.setPassword(PASSWORD);
		
		// 创建连接
		Connection connection = connectionFactory.createConnection();
		// 开启连接
		connection.start();
		// 创建会话
		/**
		 * 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit();
		 * 如果设置false,操作消息队列后,不使用session.commit();
		 */
		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		// 创建队列
		Queue queue = session.createQueue(QUEUE_NAME);
		// 创建消费者
		MessageConsumer consumer = session.createConsumer(queue);
		try 
			consumer.setMessageListener(new MessageListenerCallBack(session));
		 catch (Exception e) 
			// TODO: handle exception
			// 关闭连接
			session.close();
			connection.close();
			System.out.println("消费结束0");
			e.printStackTrace();
		
	


②、MessageListenerCallBack

package queue.consumer;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.*;

public class MessageListenerCallBack implements MessageListener 

	private Session session;
	
	private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

	public MessageListenerCallBack(Session session) 
		this.session = session;
	

	@Override
	public void onMessage(Message message) 
		// TODO 自动生成的方法存根
		TextMessage textMessage = (TextMessage) message;
		try 
			if (textMessage != null)  
				// 接收到消息
				System.out.println("["+ simpleDateFormat.format(new Date()) + "] 接收到消息:" + textMessage.getText());
				message.acknowledge();
			 else 
				//没有接收到消息,通知producer重新发送
				this.session.recover();
			
		 catch (JMSException e) 
			// TODO 自动生成的 catch 块
			e.printStackTrace();
		
	



以上是关于WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费的主要内容,如果未能解决你的问题,请参考以下文章

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 MySQL 数据源

WSO2 ESB 5.0.0 配置 MySQL 数据源