WSO2 ESB 5.0.0 配置消息存储

Posted 菠萝蚊鸭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WSO2 ESB 5.0.0 配置消息存储相关的知识,希望对你有一定的参考价值。

WSO2 ESB 5.0.0 配置消息存储

一、配置数据源

1、添加数据源

配置 --> 数据源 --> 添加数据源

2、添加存储数据的表

CREATE TABLE jdbc_message_store(
	indexId BIGINT( 20 ) NOT NULL AUTO_INCREMENT ,
	msg_id VARCHAR( 200 ) NOT NULL ,
	message BLOB NOT NULL ,
	PRIMARY KEY ( indexId )
);

二、配置接口

1、添加 Message Store



MsgStore.xml

<?xml version="1.0" encoding="UTF-8"?>
<messageStore class="org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore" name="MsgStore" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="store.jdbc.dsName">mysql</parameter>
    <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
    <parameter name="store.jdbc.table">jdbc_message_store</parameter>
</messageStore>

2、在 API 中添加 Store

3、发布应用

4、Postman 测试

未接到消息时为0.

postman发送报文

再次刷新消息存储,消息变为1了!

数据库表也存储了这条消息

三、查看存储的消息

1、查看消息内容

不能直接查看 message 内容,需要另外使用程序查看,因为 blob 存储的是对象。查看源码它是使用 ObjectInputStream 读取消息的。

使用 Eclipse Java 新建一个工程

导入 WSO2 包:WSO2ESB_HOME\\repository\\components\\plugins\\synapse-core_2.1.7.wso2v7.jar
JDBC包:mysql-connector-java-5.1.26.jar
fastjson包:fastjson-1.2.15.jar

Main.java

package com;

import java.util.List;
import java.util.Map;

import org.apache.synapse.message.store.impl.commons.StorableMessage;

import com.alibaba.fastjson.JSON;

public class Main 

	public static void main(String[] args) 
		// TODO 自动生成的方法存根

		String[] field =  "indexId", "msg_id", "message" ;
		String sqlString = "SELECT indexId,msg_id,message FROM jdbc_message_store;";
		List<Object> msgList = Utils.getMysqlInfo(sqlString, field);

		String indexId = "", msg_id = "";
		StorableMessage message = null;

		// 行数
		for (int i = 0; i < (msgList.size() / field.length); i++) 
			// 列数
			for (int j = 0; j < field.length; j++) 
				if (j == 0) 
					indexId = (String) msgList.get(i * field.length + j);
				
				if (j == 1) 
					msg_id = (String) msgList.get(i * field.length + j);
				
				if (j == 2) 
					message = (StorableMessage)msgList.get(i * field.length + j);
				
			
			//System.out.println(indexId + "\\t" + msg_id);
			Map map = message.getAxis2message().getProperties();
			System.out.println(JSON.toJSON(map));
			Map map2 = message.getSynapseMessage().getProperties();
			System.out.println(JSON.toJSON(map2));
		

	



Utils.java

package com;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.synapse.message.store.impl.commons.StorableMessage;

public class Utils 

	private static Connection conn;

	/**
	 * 数据库连接
	 * 
	 * @return
	 */
	public static Connection getsourcedata() 
		String dburl = "";
		String username = "";
		String password = "";

		dburl = "jdbc:mysql://localhost:3306/Carbon_WSO2?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8";
		username = "wxhntmy";
		password = "123456";
		try 
			Class.forName("com.mysql.jdbc.Driver");
			conn = DriverManager.getConnection(dburl, username, password);
			System.out.println("数据库连接成功!");
		 catch (Exception e) 
			e.printStackTrace();
		
		return conn;
	

	/**
	 * 查询数据库获取结果
	 * 
	 * @param sql   "SQL语句"
	 * @param field "字段" 可传多值,String[]
	 * @return
	 */
	public static List<Object> getMysqlInfo(String sql, String... field) 
		List<Object> respon = new ArrayList<>();
		PreparedStatement pstmt = null;
		Connection con = Utils.getsourcedata();
//		getLogger("supplier_receive_java","getMysqlInfo_<field>",field);
		try 
			pstmt = con.prepareStatement(sql);
			ResultSet result = pstmt.executeQuery();
			System.out.println("utils 最终运行SQL: " + pstmt);
			while (result.next()) 
				for (String string : field) 
					if ("message".equals(string)) 
						byte[] msgObj = result.getBytes(string);

						if (msgObj != null) 
							ObjectInputStream ios = null;
							try 
								ios = new ObjectInputStream(new ByteArrayInputStream(msgObj));
								Object msg = ios.readObject();
								if (msg instanceof StorableMessage) 
						              StorableMessage jdbcMsg = (StorableMessage)msg;
						              //System.out.println(jdbcMsg);
						              respon.add(jdbcMsg);
						        
							 catch (Exception e) 
								e.printStackTrace();
							 finally 
								try 
									ios.close();
								 catch (IOException e) 
									e.printStackTrace();
								
							
							continue;
						

					 else 
						respon.add(result.getString(string));
					
				
			
			pstmt.close();
			con.close();
		 catch (SQLException e) 
			e.printStackTrace();
			try 
				con.close();
			 catch (SQLException e1) 
				e1.printStackTrace();
			
		
		return respon;
	



2、查看消息


message.getAxis2message().getProperties()

message.getSynapseMessage().getProperties(),这里可以看到我们设置的属性

以上是关于WSO2 ESB 5.0.0 配置消息存储的主要内容,如果未能解决你的问题,请参考以下文章

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费

为 WSO2 ESB 5.0.0 集群配置 MySQL 数据库