WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
Posted 菠萝蚊鸭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅相关的知识,希望对你有一定的参考价值。
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
在此示例中,在 ActiveMQ 中创建主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。
一、在 ESB 中配置 ActiveMQ
二、配置发布者
1、在 ActiveMQ 创建主题
在 ActiveMQ 创建名为 SimplePublishSubscribeService 的主题
2、在 ESB 中添加代理服务
添加一个名为 PublishSubscribe 的代理服务并将其配置为发布到主题 SimplePublishSubscribeService。 可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到 <ESB_HOME>/repository/deployment/server/synapse-configs/default/proxy-services
。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。
URI:jms:/主题名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=topic
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<endpoint>
<address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.131.128:61616&transport.jms.DestinationType=topic"/>
</endpoint>
<inSequence>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
在 ESB 发布 PublishSubscribe 代理服务后,ESB 会在 ActiveMQ 创建名为 PublishSubscribe 的队列。
三、配置订阅者
接下来,配置两个订阅 JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。 以下是这些代理服务的示例配置。
1、订阅者 1 配置
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<property expression="json-eval($.)" name="jsondata" scope="default" type="STRING"/>
<log level="custom">
<property name="Subscriber1" value="I am Subscriber1"/>
</log>
<class name="com.SimplePublishSubscribeService1">
<property name="property_name" value="jsondata"/>
</class>
<payloadFactory media-type="json">
<format>$1</format>
<args>
<arg evaluator="xml" expression="get-property("datainfos")"/>
</args>
</payloadFactory>
<drop/>
</inSequence>
<outSequence>
<send/>
</outSequence>
<faultSequence/>
</target>
<parameter name="transport.jms.DestinationType">topic</parameter>
<parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/json</default>
</rules>
</parameter>
<parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>
指定主题消息的类型,json/xml/text 等。
特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字
ClassMediator
package com;
import org.apache.synapse.MessageContext;
import org.apache.synapse.mediators.AbstractMediator;
public class SimplePublishSubscribeService1 extends AbstractMediator
private String property_name;
public String getProperty_name()
return property_name;
public void setProperty_name(String property_name)
this.property_name = property_name;
public boolean mediate(MessageContext context)
// TODO Implement your mediation logic here
String topic_data = (String) context.getProperty(property_name);
System.out.println("订阅者1接收到的数据:" + topic_data);
context.setProperty("datainfos", topic_data);
return true;
2、订阅者 2 配置
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<log level="custom">
<property name="Subscriber2" value="I am Subscriber2"/>
</log>
<drop/>
</inSequence>
<outSequence>
<send/>
</outSequence>
<faultSequence/>
</target>
<parameter name="transport.jms.DestinationType">topic</parameter>
<parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/json</default>
</rules>
</parameter>
<parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>
指定主题消息的类型,json/xml/text 等。
特别特别注意!!!!!!transport.jms.ConnectionFactory 配置的是 JMS 传输侦听器中的 parameter 名字
四、发布 ESB 代理服务
发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。
订阅者发布成功!
五、测试消息发布与订阅
在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ESB 的 PublishSubscribe 代理服务发送消息,或者向 ActiveMQ 的 PublishSubscribe 队列发送消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息, SimplePublishSubscribeService1 和 SimplePublishSubscribeService1 都会收到所订阅主题的消息。
向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息
WSO2 ESB 收到订阅消息
六、示例代码
库文件 <AMQ_HOME>/activemq-all-x.x.x.jar
1、主题消息发布者
package topic.producer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
public class TopicMsgProducer
private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String TOPIC_NAME = "SimplePublishSubscribeService";
public static void main(String[] args)
new TopicMsgProducer().send();
public void send()
// 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
try
factory.setBrokerURL(defaultURL);
factory.setUserName(USER_NAME);
factory.setPassword(PASSWORD);
// 创建连接
conn = factory.createConnection();
conn.start();
// 创建会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建地点
Topic topic = session.createTopic(TOPIC_NAME);
// 创建生产者
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
TextMessage tmsg = session.createTextMessage();
JSONObject jsonObject = new JSONObject();
jsonObject.put("text", "Hello, CalvinChan!");
for(int i = 0; i < 10; i++)
tmsg.setText(jsonObject.toJSONString());
producer.send(tmsg);
System.out.println("发送的消息:" + tmsg.getText());
Thread.sleep(2000);
catch (JMSException | InterruptedException e)
e.printStackTrace();
finally
try
if (conn != null)
conn.close();
catch (Throwable ignore)
2、主题消息订阅者
①、Consumer
package topic.consumer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicMsgConsumer
private static final String DEFAULT_BROKER_HOST = "192.168.131.128";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
private static final String USER_NAME = "user";
private static final String PASSWORD = "user";
private static final String TOPIC_NAME = "SimplePublishSubscribeService";
private static final String CLIENT_ID = "calvinchan";
public static void main(String[] args)
// TODO 自动生成的方法存根
new TopicMsgConsumer().receive();
public void receive()
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
try
factory.setBrokerURL(defaultURL);
factory.setUserName(USER_NAME);
factory.setPassword(PASSWORD);
conn = factory.createConnection();
conn.setClientID(CLIENT_ID);
conn.start();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 订阅发布模式的 Topic对象 不是Destination
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID);
subsriber.setMessageListener(new MessageListenerCallBack(session));
while (true);
catch (JMSException e)
e.printStackTrace();
finally
if (conn != null)
try
conn.close();
catch (JMSException e)
e.printStackTrace();
②、MessageListenerCallBack
package topic.consumer;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
public class MessageListenerCallBack implements MessageListener
private Session session;
private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public MessageListenerCallBack(Session session)
this.session = session;
@Override
public void onMessage(Message message)
// TODO 自动生成的方法存根
TextMessage textMessage = (TextMessage) message;
try
if (textMessage != null)
// 接收到消息
System.out.println(&以上是关于WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅