如何正确设置IBM WebSphere MQ 实现群发消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何正确设置IBM WebSphere MQ 实现群发消息相关的知识,希望对你有一定的参考价值。

参考的是IBM的教程 MQ集群的使用
http://www.ibm.com/developerworks/cn/websphere/library/techarticles/loulijun/MQCluster/MQCluster.html
我参考的这个部分:2.1 利用群集简化配置并进行负载均衡

现在的问题是建立了共享队列INPUTQ 后,在QMGRI里面压根看不到共享队列。。
谁能告诉我为什么么?
或者如何才能实现真正的群发消息,
往一个队列管理器里的队列里发消息,
其他的所有对立管理器的本地队列都能收到消息

另外,我之前发的一个100分的问题,怎么到现在还没审核下来啊???有人遇到这个问题么?
能解决问题,可再追加分。。
好了,上面的问题解决了。。
不过现在对WebSphere MQ的功能产生了怀疑,
应用程序只能主动去询问MQ,指定的队列中有消息否。要是这样的话,那不是要轮询指定的队列?

难道不能让MQ将消息直接推送到应用程序么?

沙发上,这个算答案么??
有没有人直接我回答我:是或者不是?
应用程序是不是只能采取轮询的方式去MQ上获取消息。
虽然貌似取消息的那个函数是阻塞的,可以设置等待时间,
但是时间到了,还没有消息的话,也只能再次询问并等待消息(即轮询)
是这样的么?

参考技术A 1.现在的问题是建立了共享队列INPUTQ 后,在QMGRI里面压根看不到共享队列。。
谁能告诉我为什么么?

大哥你怎么看的?QL CLUSTER属性设置对了吗?
DEF QL(YOURQNAME) CLUSTER(REPOS_NAME)
你看的是QCLUSTER吗?
DIS QC(*)

2.往一个队列管理器里的队列里发消息,
其他的所有对立管理器的本地队列都能收到消息
太简单了,注意RFHUTIL里面有个BIND属性,不要设置成OPEN,必须是NOT_FIXED;或者多次使用amqsputc多次打开,不要一次拼命放消息,都可以看到分发效果,默认负载均衡的算法是ROUND_ROBIN

3.好了,上面的问题解决了。。
不过现在对WebSphere MQ的功能产生了怀疑,

我真想捧腹大笑

4.应用程序只能主动去询问MQ,指定的队列中有消息否。要是这样的话,那不是要轮询指定的队列?难道不能让MQ将消息直接推送到应用程序么?

去看看TRIGGER, MDB有关的资料,我不想费口舌了。当你学会基础之前STOP MAKING JUDGEMENTS.

IBM websphere MQ 消息发送与获取

一. 所需依赖包,安装 IBM websphere MQ 后,在安装目录下的 java 目录内

import java.io.IOException;
import java.util.Properties;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.MQConstants;

/**
 * 客户端模式开发
 * 
 * @author Geely
 *
 */
public class MQTest1 {

    public static void main(String[] args) throws MQException, IOException {
        
        // 发送消息给队列
        put();

        // 从队列读取消息
        get();
        
        // 获取队列深度
        getDepth();
        
    }

    @SuppressWarnings("unchecked")
    static void put() throws MQException, IOException {

        // 配置MQ服务器连接参数
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 设置队列管理器字符集 编码
        MQEnvironment.CCSID = 1381;

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);

        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 以可写的方式访问队列管理器已定义的队列QUEUE1,当然也可以创建队列
        MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT);

        // 新建并发送消息给队列
        MQMessage myMessage = new MQMessage();
        myMessage.expiry = -1;    // 设置消息用不过期
        String name = "MePlusPlus‘s 博客";
        myMessage.writeUTF(name);

        // 使用默认的消息选项
        MQPutMessageOptions pmo = new MQPutMessageOptions();
        // 发送消息
        putQueue.put(myMessage, pmo);
        putQueue.close();

        // 断开连接
        queueManager.disconnect();
    }

    @SuppressWarnings("unchecked")
    static void get() throws MQException, IOException {

        // 配置MQ服务器连接参数
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 设置队列管理器字符集 编码
        MQEnvironment.CCSID = 1381;

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 设置应用名称,方便服务器MQ 查看应用连接
        MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS);

        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 打开队列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;

        // 以可读的方式访问队列管理器已定义的队列QUEUE1
        // MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF);
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步点控制下获取消息.
        //gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 如果在队列上没有消息则等待.
        //gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 如果队列管理器停顿则失败.
        //gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;
        // Sets the time limit for the wait.
        // 设置等待的时间限制.
        gmo.waitInterval = 3000;

        // 从队列读取消息
        MQMessage theMessage = new MQMessage();
        getQueue.get(theMessage, gmo);

        String name = theMessage.readUTF();

        System.out.println(name);
        getQueue.close();

        // 断开连接
        queueManager.disconnect();
    }
    
    static void getDepth() throws MQException, IOException {

        Properties props  = new Properties();
        props .put("hostname", "127.0.0.1");
        props .put("port", 1414); //端口号
        props .put("channel", "QM_ACK"); //服务器连接通道
        props .put("CCSID", 1381);
        props .put("transport", "MQSeries");
        
        // 创建实例,连接队列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",props );

        // 打开队列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;
        // 以可读的方式访问队列管理器已定义的队列QUEUE1
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步点控制下获取消息.
        // gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 如果在队列上没有消息则等待.
        // gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 如果队列管理器停顿则失败.
        // gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;

        // Sets the time limit for the wait.
        // 设置等待的时间限制.
        gmo.waitInterval = 3000;
        
        int depth = getQueue.getCurrentDepth();
        System.out.println("该队列当前的深度为:"+depth);
        System.out.println("===========================");
        while(depth-->0)
        {
            MQMessage msg = new MQMessage();// 要读的队列的消息
            getQueue.get(msg, gmo);
            System.out.println("消息的大小为:"+msg.getDataLength());
            System.out.println("消息的内容:\n"+msg.readUTF());
            System.out.println("---------------------------");
        }
        System.out.println("Finish!!!");

        getQueue.close();

        // 断开连接
        queueManager.disconnect();
        
    }

}

 

以上是关于如何正确设置IBM WebSphere MQ 实现群发消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在 IBM Websphere MQ 中创建指向队列的别名主题?

IBM websphere MQ 消息发送与获取

IBM WebSphere MQ 请求/回复场景

IBM WebSphere MQ安装集成

IBM WebSphere MQ安装集成

IBM Websphere MQ 基本实验操作