一. 所需依赖包,安装 IBM websphere MQ 后,在安装目录下的 java 目录内
import java.io.IOException; import java.util.Properties; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.CMQC; import com.ibm.mq.constants.MQConstants; /** * 客户端模式开发 * * @author Geely * */ public class MQTest1 { public static void main(String[] args) throws MQException, IOException { // 发送消息给队列 put(); // 从队列读取消息 get(); // 获取队列深度 getDepth(); } @SuppressWarnings("unchecked") static void put() throws MQException, IOException { // 配置MQ服务器连接参数 MQEnvironment.hostname = "127.0.0.1"; MQEnvironment.port = 1414; MQEnvironment.channel = "QM_ACK"; // MQEnvironment.userID = ""; // MQEnvironment.password = ""; // 设置队列管理器字符集 编码 MQEnvironment.CCSID = 1381; // 设置应用名称,方便服务器MQ 查看应用连接 MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 设置应用名称,方便服务器MQ 查看应用连接 MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); // 创建实例,连接队列管理器 MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 以可写的方式访问队列管理器已定义的队列QUEUE1,当然也可以创建队列 MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT); // 新建并发送消息给队列 MQMessage myMessage = new MQMessage(); myMessage.expiry = -1; // 设置消息用不过期 String name = "MePlusPlus‘s 博客"; myMessage.writeUTF(name); // 使用默认的消息选项 MQPutMessageOptions pmo = new MQPutMessageOptions(); // 发送消息 putQueue.put(myMessage, pmo); putQueue.close(); // 断开连接 queueManager.disconnect(); } @SuppressWarnings("unchecked") static void get() throws MQException, IOException { // 配置MQ服务器连接参数 MQEnvironment.hostname = "127.0.0.1"; MQEnvironment.port = 1414; MQEnvironment.channel = "QM_ACK"; // MQEnvironment.userID = ""; // MQEnvironment.password = ""; // 设置队列管理器字符集 编码 MQEnvironment.CCSID = 1381; // 设置应用名称,方便服务器MQ 查看应用连接 MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 设置应用名称,方便服务器MQ 查看应用连接 MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS); // 创建实例,连接队列管理器 MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 打开队列. int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE; // 以可读的方式访问队列管理器已定义的队列QUEUE1 // MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF); MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions(); // Get messages under sync point control. // 在同步点控制下获取消息. //gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT; // Wait if no messages on the Queue. // 如果在队列上没有消息则等待. //gmo.options = gmo.options + CMQC.MQGMO_WAIT; // Fail if QeueManager Quiescing. // 如果队列管理器停顿则失败. //gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING; // Sets the time limit for the wait. // 设置等待的时间限制. gmo.waitInterval = 3000; // 从队列读取消息 MQMessage theMessage = new MQMessage(); getQueue.get(theMessage, gmo); String name = theMessage.readUTF(); System.out.println(name); getQueue.close(); // 断开连接 queueManager.disconnect(); } static void getDepth() throws MQException, IOException { Properties props = new Properties(); props .put("hostname", "127.0.0.1"); props .put("port", 1414); //端口号 props .put("channel", "QM_ACK"); //服务器连接通道 props .put("CCSID", 1381); props .put("transport", "MQSeries"); // 创建实例,连接队列管理器 MQQueueManager queueManager = new MQQueueManager("QM",props ); // 打开队列. int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE; // 以可读的方式访问队列管理器已定义的队列QUEUE1 MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions(); // Get messages under sync point control. // 在同步点控制下获取消息. // gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT; // Wait if no messages on the Queue. // 如果在队列上没有消息则等待. // gmo.options = gmo.options + CMQC.MQGMO_WAIT; // Fail if QeueManager Quiescing. // 如果队列管理器停顿则失败. // gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING; // Sets the time limit for the wait. // 设置等待的时间限制. gmo.waitInterval = 3000; int depth = getQueue.getCurrentDepth(); System.out.println("该队列当前的深度为:"+depth); System.out.println("==========================="); while(depth-->0) { MQMessage msg = new MQMessage();// 要读的队列的消息 getQueue.get(msg, gmo); System.out.println("消息的大小为:"+msg.getDataLength()); System.out.println("消息的内容:\n"+msg.readUTF()); System.out.println("---------------------------"); } System.out.println("Finish!!!"); getQueue.close(); // 断开连接 queueManager.disconnect(); } }