WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

Posted 菠萝蚊鸭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅相关的知识,希望对你有一定的参考价值。

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

在此示例中,在 ActiveMQ 中创建主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。

一、在 ESB 中配置 ActiveMQ

在 ESB 中配置 ActiveMQ

二、配置发布者

1、在 ActiveMQ 创建主题

在 ActiveMQ 创建名为 SimplePublishSubscribeService 的主题

2、在 ESB 中添加代理服务

添加一个名为 PublishSubscribe 的代理服务并将其配置为发布到主题 SimplePublishSubscribeService。 可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到 <ESB_HOME>/repository/deployment/server/synapse-configs/default/proxy-services。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。

URI:jms:/主题名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&amp;transport.jms.DestinationType=topic

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://192.168.131.128:61616&amp;transport.jms.DestinationType=topic"/>
        </endpoint>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

在 ESB 发布 PublishSubscribe 代理服务后,ESB 会在 ActiveMQ 创建名为 PublishSubscribe 的队列。

三、配置订阅者

接下来,配置两个订阅 JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。 以下是这些代理服务的示例配置。

1、订阅者 1 配置

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property expression="json-eval($.)" name="jsondata" scope="default" type="STRING"/>
            <log level="custom">
                <property name="Subscriber1" value="I am Subscriber1"/>
            </log>
            <class name="com.SimplePublishSubscribeService1">
                <property name="property_name" value="jsondata"/>
            </class>
            <payloadFactory media-type="json">
                <format>$1</format>
                <args>
                    <arg evaluator="xml" expression="get-property(&quot;datainfos&quot;)"/>
                </args>
            </payloadFactory>
            <drop/>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">topic</parameter>
    <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>

指定主题消息的类型,json/xml/text 等。

特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字

ClassMediator

package com;

import org.apache.synapse.MessageContext; 
import org.apache.synapse.mediators.AbstractMediator;

public class SimplePublishSubscribeService1 extends AbstractMediator  

	private String property_name;

	public String getProperty_name() 
		return property_name;
	

	public void setProperty_name(String property_name) 
		this.property_name = property_name;
	
	
	
	public boolean mediate(MessageContext context)  
		// TODO Implement your mediation logic here 
		
		String topic_data = (String) context.getProperty(property_name);
		
		System.out.println("订阅者1接收到的数据:" + topic_data);
		
		context.setProperty("datainfos", topic_data);
		
		return true;
	

2、订阅者 2 配置

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <log level="custom">
                <property name="Subscriber2" value="I am Subscriber2"/>
            </log>
            <drop/>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">topic</parameter>
    <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>

指定主题消息的类型,json/xml/text 等。

特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字

四、发布 ESB 代理服务

发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。

订阅者发布成功!

五、测试消息发布与订阅

在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ESB 的 PublishSubscribe 代理服务发送消息,或者向 ActiveMQ 的 PublishSubscribe 队列发送消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息, SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 都会收到所订阅主题的消息。

向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息

WSO2 ESB 收到订阅消息

六、示例代码

库文件 <AMQ_HOME>/activemq-all-x.x.x.jar

1、主题消息发布者

package topic.producer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.alibaba.fastjson.JSONObject;

public class TopicMsgProducer 

	private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";

	private static final String TOPIC_NAME = "SimplePublishSubscribeService";

	public static void main(String[] args) 
		new TopicMsgProducer().send();
	

	public void send() 
		// 创建连接工厂
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();

		Connection conn = null;
		try 

			factory.setBrokerURL(defaultURL);
			factory.setUserName(USER_NAME);
			factory.setPassword(PASSWORD);

			// 创建连接
			conn = factory.createConnection();
			conn.start();
			// 创建会话
			Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 创建地点
			Topic topic = session.createTopic(TOPIC_NAME);
			// 创建生产者
			MessageProducer producer = session.createProducer(topic);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

			TextMessage tmsg = session.createTextMessage();
			
			JSONObject jsonObject = new JSONObject();
			jsonObject.put("text", "Hello, CalvinChan!");
			
			for(int i = 0; i < 10; i++) 
				tmsg.setText(jsonObject.toJSONString());
				producer.send(tmsg);
				System.out.println("发送的消息:" + tmsg.getText());
				
				Thread.sleep(2000);
			
			
		 catch (JMSException | InterruptedException e) 
			e.printStackTrace();
		 finally 
			try 
				if (conn != null)
					conn.close();
			 catch (Throwable ignore) 
			
		
	



2、主题消息订阅者

①、Consumer

package topic.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicMsgConsumer 

	private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
	private static final String DEFAULT_BROKER_PORT = "61616";
	private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
	
	private static final String USER_NAME = "user";
	private static final String PASSWORD = "user";
	
	private static final String TOPIC_NAME = "SimplePublishSubscribeService";
	
	private static final String CLIENT_ID = "calvinchan";
	
	
	public static void main(String[] args) 
		// TODO 自动生成的方法存根
		new TopicMsgConsumer().receive();
	

	public void receive() 
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
		Connection conn = null;
		try 
			factory.setBrokerURL(defaultURL);
			factory.setUserName(USER_NAME);
			factory.setPassword(PASSWORD);
			
			conn = factory.createConnection();
			conn.setClientID(CLIENT_ID);
			conn.start();
			Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
			// 订阅发布模式的 Topic对象 不是Destination
			Topic topic = session.createTopic(TOPIC_NAME);
			TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID);
			subsriber.setMessageListener(new MessageListenerCallBack(session));
			while (true);
		 catch (JMSException e) 
			e.printStackTrace();
		 finally 
			if (conn != null) 
				try 
					conn.close();
				 catch (JMSException e) 
					e.printStackTrace();
				
			
		
	


②、MessageListenerCallBack

package topic.consumer;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MessageListenerCallBack implements MessageListener 

	private Session session;
	
	private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

	public MessageListenerCallBack(Session session) 
		this.session = session;
	
	
	@Override
	public void onMessage(Message message) 
		// TODO 自动生成的方法存根
		TextMessage textMessage = (TextMessage) message;
		
		try 
			if (textMessage != null)  
				// 接收到消息
				System.out.println(&

以上是关于WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅的主要内容,如果未能解决你的问题,请参考以下文章

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 MySQL 数据源

WSO2 ESB 5.0.0 配置 MySQL 数据源