??连接Queue Manager的工具类:
package com.quest.mq; import java.io.IOException; import java.util.Hashtable; import com.ibm.mq.MQException; import com.ibm.mq.MQQueueManager; import com.ibm.mq.pcf.PCFMessage; import com.ibm.mq.pcf.PCFMessageAgent; public class MQTool { public String queueManagerName = "QM"; public String hostname = "localhost"; public int port = 1411; public String channel = "SYSTEM.DEF.SVRCONN"; protected Hashtable<String, Object> properties = new Hashtable<String, Object>(); public MQTool(String queueManagerName, String hostname, int port, String channel){ this.queueManagerName = queueManagerName; this.hostname = hostname; this.port = port; this.channel = channel; try { queueManager = new MQQueueManager(queueManagerName, getConnectionProperties()); agent = new PCFMessageAgent(); agent.connect(queueManager); } catch (Exception e) { e.printStackTrace(); } } public PCFMessageAgent agent = null; public MQQueueManager queueManager = null; public MQQueueManager getQueueManager() { return queueManager; } public final boolean isNullOrEmpty(String input) { boolean result = false; if(input == null) { result = true; }else if(input.length() == 0) { result = true; } return result; } /** * Construct MQ queue manager connection properties * @param hostname * @param port * @param channel * @return */ private Hashtable<String, Object> getConnectionProperties() { properties.put("hostname", hostname); properties.put("port", port); properties.put("channel", channel); System.out.println("Connection informations: " + properties); System.out.println("Connect QM :" + queueManagerName); return properties; } /** * Get access to a command agent, based on a queue manager name * @param queueManagerName the queue manager name * @return */ public PCFMessageAgent getAgent() { return agent; } protected PCFMessage[] pcfInquire(PCFMessage queuePCF) throws MQException, IOException{ getAgent().connect(queueManager); PCFMessage[] responses = getAgent().send(queuePCF); getAgent().disconnect(); return responses; } }?
?
?对一条Queue的放入消息,取出消息:
import java.io.IOException; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.MQTopic; import com.ibm.mq.constants.CMQC; import com.ibm.mq.constants.MQConstants; import com.quest.mq.MQTool; public class MQTest { /** * @param args * @throws MQException * @throws IOException * @throws InterruptedException */ public static void main(String[] args) throws MQException, IOException, InterruptedException { String queueManagerName = "QM1"; String hostname = "127.0.0.1"; int port = 1421; String channel = "SYSTEM.DEF.SVRCONN"; MQTool mqTool = new MQTool(queueManagerName, hostname, port, channel); MQQueueManager queueManager = mqTool.getQueueManager(); //放入消息 MQQueue queue1 = queueManager.accessQueue("apple2", CMQC.MQOO_OUTPUT); MQMessage message1 = new MQMessage(); message1.writeString("33333"); queue1.put(message1); //不消费消息 MQQueue queue2 = queueManager.accessQueue("apple2", MQConstants.MQOO_BROWSE | MQConstants.MQOO_INQUIRE); MQGetMessageOptions getMsgOption = new MQGetMessageOptions(); getMsgOption.options = MQConstants.MQGMO_BROWSE_NEXT; for(int i=0; i< queue2.getCurrentDepth(); i++){ MQMessage message = new MQMessage(); queue2.get(message, getMsgOption); System.out.println(message.readStringOfCharLength(message.getDataLength())); } //消费消息 MQQueue queue3 = queueManager.accessQueue("apple2", CMQC.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_INQUIRE); int depth = queue3.getCurrentDepth(); for(int i=0; i < depth; i++){ MQMessage message = new MQMessage(); queue3.get(message); System.out.println(message.readStringOfCharLength(message.getDataLength())+ "----"); System.out.println("==="); } } }
?
?