WSO2 ESB 5.0.0 配置消息存储
Posted 菠萝蚊鸭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置消息存储相关的知识,希望对你有一定的参考价值。
WSO2 ESB 5.0.0 配置消息存储
一、配置数据源
1、添加数据源
配置 --> 数据源 --> 添加数据源
2、添加存储数据的表
CREATE TABLE jdbc_message_store(
indexId BIGINT( 20 ) NOT NULL AUTO_INCREMENT ,
msg_id VARCHAR( 200 ) NOT NULL ,
message BLOB NOT NULL ,
PRIMARY KEY ( indexId )
);
二、配置接口
1、添加 Message Store
MsgStore.xml
<?xml version="1.0" encoding="UTF-8"?>
<messageStore class="org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore" name="MsgStore" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="store.jdbc.dsName">mysql</parameter>
<parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
<parameter name="store.jdbc.table">jdbc_message_store</parameter>
</messageStore>
2、在 API 中添加 Store
3、发布应用
4、Postman 测试
未接到消息时为0.
postman发送报文
再次刷新消息存储,消息变为1了!
数据库表也存储了这条消息
三、查看存储的消息
1、查看消息内容
不能直接查看 message 内容,需要另外使用程序查看,因为 blob 存储的是对象。查看源码它是使用 ObjectInputStream 读取消息的。
使用 Eclipse Java 新建一个工程
导入 WSO2 包:WSO2ESB_HOME\\repository\\components\\plugins\\synapse-core_2.1.7.wso2v7.jar
JDBC包:mysql-connector-java-5.1.26.jar
fastjson包:fastjson-1.2.15.jar
Main.java
package com;
import java.util.List;
import java.util.Map;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
import com.alibaba.fastjson.JSON;
public class Main
public static void main(String[] args)
// TODO 自动生成的方法存根
String[] field = "indexId", "msg_id", "message" ;
String sqlString = "SELECT indexId,msg_id,message FROM jdbc_message_store;";
List<Object> msgList = Utils.getMysqlInfo(sqlString, field);
String indexId = "", msg_id = "";
StorableMessage message = null;
// 行数
for (int i = 0; i < (msgList.size() / field.length); i++)
// 列数
for (int j = 0; j < field.length; j++)
if (j == 0)
indexId = (String) msgList.get(i * field.length + j);
if (j == 1)
msg_id = (String) msgList.get(i * field.length + j);
if (j == 2)
message = (StorableMessage)msgList.get(i * field.length + j);
//System.out.println(indexId + "\\t" + msg_id);
Map map = message.getAxis2message().getProperties();
System.out.println(JSON.toJSON(map));
Map map2 = message.getSynapseMessage().getProperties();
System.out.println(JSON.toJSON(map2));
Utils.java
package com;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.synapse.message.store.impl.commons.StorableMessage;
public class Utils
private static Connection conn;
/**
* 数据库连接
*
* @return
*/
public static Connection getsourcedata()
String dburl = "";
String username = "";
String password = "";
dburl = "jdbc:mysql://localhost:3306/Carbon_WSO2?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8";
username = "wxhntmy";
password = "123456";
try
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(dburl, username, password);
System.out.println("数据库连接成功!");
catch (Exception e)
e.printStackTrace();
return conn;
/**
* 查询数据库获取结果
*
* @param sql "SQL语句"
* @param field "字段" 可传多值,String[]
* @return
*/
public static List<Object> getMysqlInfo(String sql, String... field)
List<Object> respon = new ArrayList<>();
PreparedStatement pstmt = null;
Connection con = Utils.getsourcedata();
// getLogger("supplier_receive_java","getMysqlInfo_<field>",field);
try
pstmt = con.prepareStatement(sql);
ResultSet result = pstmt.executeQuery();
System.out.println("utils 最终运行SQL: " + pstmt);
while (result.next())
for (String string : field)
if ("message".equals(string))
byte[] msgObj = result.getBytes(string);
if (msgObj != null)
ObjectInputStream ios = null;
try
ios = new ObjectInputStream(new ByteArrayInputStream(msgObj));
Object msg = ios.readObject();
if (msg instanceof StorableMessage)
StorableMessage jdbcMsg = (StorableMessage)msg;
//System.out.println(jdbcMsg);
respon.add(jdbcMsg);
catch (Exception e)
e.printStackTrace();
finally
try
ios.close();
catch (IOException e)
e.printStackTrace();
continue;
else
respon.add(result.getString(string));
pstmt.close();
con.close();
catch (SQLException e)
e.printStackTrace();
try
con.close();
catch (SQLException e1)
e1.printStackTrace();
return respon;
2、查看消息
message.getAxis2message().getProperties()
message.getSynapseMessage().getProperties(),这里可以看到我们设置的属性
以上是关于WSO2 ESB 5.0.0 配置消息存储的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费