IBM websphere MQ 消息发送与获取

Posted 如若

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IBM websphere MQ 消息发送与获取相关的知识,希望对你有一定的参考价值。

一. 所需依赖包,安装 IBM websphere MQ 后,在安装目录下的 java 目录内

import java.io.IOException;
import java.util.Properties;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.MQConstants;

/**
 * 客户端模式开发
 * 
 * @author Geely
 *
 */
public class MQTest1 {

    public static void main(String[] args) throws MQException, IOException {
        
        // 发送消息给队列
        put();

        // 从队列读取消息
        get();
        
        // 获取队列深度
        getDepth();
        
    }

    @SuppressWarnings("unchecked")
    static void put() throws MQException, IOException {

        // 配置MQ服务器连接参数
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 设置队列管理器字符集 编码
        MQEnvironment.CCSID = 1381;

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);

        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 以可写的方式访问队列管理器已定义的队列QUEUE1,当然也可以创建队列
        MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT);

        // 新建并发送消息给队列
        MQMessage myMessage = new MQMessage();
        myMessage.expiry = -1;    // 设置消息用不过期
        String name = "MePlusPlus‘s 博客";
        myMessage.writeUTF(name);

        // 使用默认的消息选项
        MQPutMessageOptions pmo = new MQPutMessageOptions();
        // 发送消息
        putQueue.put(myMessage, pmo);
        putQueue.close();

        // 断开连接
        queueManager.disconnect();
    }

    @SuppressWarnings("unchecked")
    static void get() throws MQException, IOException {

        // 配置MQ服务器连接参数
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 设置队列管理器字符集 编码
        MQEnvironment.CCSID = 1381;

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS);

        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 打开队列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;

        // 以可读的方式访问队列管理器已定义的队列QUEUE1
        // MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF);
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步点控制下获取消息.
        //gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 如果在队列上没有消息则等待.
        //gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 如果队列管理器停顿则失败.
        //gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;
        // Sets the time limit for the wait.
        // 设置等待的时间限制.
        gmo.waitInterval = 3000;

        // 从队列读取消息
        MQMessage theMessage = new MQMessage();
        getQueue.get(theMessage, gmo);

        String name = theMessage.readUTF();

        System.out.println(name);
        getQueue.close();

        // 断开连接
        queueManager.disconnect();
    }
    
    static void getDepth() throws MQException, IOException {

        Properties props  = new Properties();
        props .put("hostname", "127.0.0.1");
        props .put("port", 1414); //端口号
        props .put("channel", "QM_ACK"); //服务器连接通道
        props .put("CCSID", 1381);
        props .put("transport", "MQSeries");
        
        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",props );

        // 打开队列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;
        // 以可读的方式访问队列管理器已定义的队列QUEUE1
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步点控制下获取消息.
        // gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 如果在队列上没有消息则等待.
        // gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 如果队列管理器停顿则失败.
        // gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;

        // Sets the time limit for the wait.
        // 设置等待的时间限制.
        gmo.waitInterval = 3000;
        
        int depth = getQueue.getCurrentDepth();
        System.out.println("该队列当前的深度为:"+depth);
        System.out.println("===========================");
        while(depth-->0)
        {
            MQMessage msg = new MQMessage();// 要读的队列的消息
            getQueue.get(msg, gmo);
            System.out.println("消息的大小为:"+msg.getDataLength());
            System.out.println("消息的内容:\n"+msg.readUTF());
            System.out.println("---------------------------");
        }
        System.out.println("Finish!!!");

        getQueue.close();

        // 断开连接
        queueManager.disconnect();
        
    }

}

 

以上是关于IBM websphere MQ 消息发送与获取的主要内容,如果未能解决你的问题,请参考以下文章

IBM WebSphere MQ请求/回复方案

IBM WebSphere MQ 请求/回复场景

使用 ASCII 编码通过 WebSphere MQ 向 SWIFT 发送消息

第十二章 IBM WebSphere MQ检索邮件

IBM Websphere MQ - MQGET 不会从队列中移除消息

IBM Websphere MQ 基本实验操作