📄 mqoperator.java
字号:
package com.mq;
import com.ibm.mq.MQC;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQException;
import com.ibm.mq.pcf.*;
import java.io.IOException;
/**
* 对MQ的操作
*
* @author wangdi
* @version 1.0
*/
public class MQOperator {
public static final int SEGMENT_SIZE = 102400;
//队列管理器
private MQQueueManager m_qMgr = null;
//队列
private MQQueue m_queue = null;
//队列管理器名称
String qManagerName = null;
//队列名称
String qName = null;
//队列数组
String[] QNAME = null;
//要发送的消息
String m_msg = null;
//接受到的消息
String inMessage = null;
//当前队列深度
long curQueueDepth = -1;
//最大队列深度
long maxQueueDepth = -1;
//完成码
int ComCode = 0;
//原因码
int ReaCode = 0;
//队列数量
int number = 0;
/**
* 构造方法
*/
public MQOperator() {
}
/**
* 取得完成码
* @return int 完成码
*/
public int getCompletionCode() {
return ComCode;
}
/**
* 设置完成码
* @param code 完成码
*/
public void setCompletionCode(int code) {
ComCode = code;
}
/**
* 取得原因码
* @return int 原因码
*/
public int getReasonCode() {
return ReaCode;
}
/**
* 设置原因码
* @param code 原因码
*/
public void setReasonCode(int code) {
ReaCode = code;
}
/**
* 取得队列数量
* @return int 队列数量
*/
public int getNumber() {
return number;
}
/**
* 设置队列数量
* @param num 队列数量
*/
public void setNumber(int num) {
number = num;
}
/**
* 连接队列管理器
*
* @param qManagerName
* 队列管理器名称
* @return m_qMgr 队列管理器
*/
public MQQueueManager connQManager(String qManagerName) {
this.qManagerName = qManagerName;
try {
m_qMgr = new MQQueueManager(qManagerName);
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
System.out.println(getCompletionCode());
System.out.println(getReasonCode());
m_qMgr = null;
}
return m_qMgr;
}
/**
* 打开队列
*
* @param qName
* 队列名称
* @return m_queue 队列
*/
public MQQueue openQueue(String qName) {
if(m_qMgr != null) {
/* 设置打开选项 */
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT
| MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING;
;
this.qName = qName;
/* 打开队列 */
try {
m_queue = m_qMgr.accessQueue(qName, openOptions, qManagerName,
null, null);
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
} else {
}
return m_queue;
}
/**
* 发送消息
* @param msg
* @return code 1为成功,0为失败
*/
public int putMessage(String msg) {
int code = 0;
if(m_queue != null && m_qMgr != null) {
/*设置发送消息时的放置消息选项*/
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQC.MQPMO_NONE; //使用默认设置
pmo.options += MQC.MQPMO_LOGICAL_ORDER;
/*创建消息缓冲区*/
MQMessage outMsg = new MQMessage();
/*设置MQMD格式字段*/
outMsg.format = MQC.MQFMT_STRING;
outMsg.characterSet = 1381;
//outMsg.characterSet = 819;
m_msg = msg;
/*获得用户要输入的信息*/
try {
outMsg.writeString(m_msg);
/*在队列上放置消息*/
m_queue.put(outMsg);
code = 1;
}catch(IOException ioEx) {
code = 0;
System.err.println(ioEx);
}catch(MQException mqEx) {
code = 0;
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
return code;
}
/**
* 接受消息
* @return 接受到的String型消息
*/
public String getMessage() {
if(m_queue != null && m_qMgr != null) {
/*设置接受消息时的放置消息选项*/
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = MQC.MQGMO_COMPLETE_MSG|MQC.MQGMO_ALL_MSGS_AVAILABLE|
MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
/*创建消息缓冲区*/
MQMessage inMsg = new MQMessage();
inMsg.format = MQC.MQFMT_STRING;
inMsg.characterSet = 1381;
//inMsg.characterSet = 819;
/*从队列到消息缓冲区获取消息*/
try {
m_queue.get(inMsg);
/*读取用户数据*/
inMessage = inMsg.readLine();
}catch(IOException ioEx) {
System.err.println(ioEx);
}catch(MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
return inMessage;
}
/**
*
* @param QMgrName
* @return
*/
public String[] listQueueName(String QMgrName) {
qManagerName = QMgrName;
PCFMessageAgent agent;
PCFMessage request;
PCFMessage[] response;
String[] names;
try {
agent = new PCFMessageAgent(qManagerName.trim());
agent.setCharacterSet(1381);
request = new PCFMessage(CMQCFC.MQCMD_INQUIRE_Q_NAMES);
request.addParameter(CMQC.MQCA_Q_NAME, "*");
request.addParameter(CMQC.MQIA_Q_TYPE, CMQC.MQOT_ALL); //默认为全部队列
response = agent.send(request);
names = (String[]) response[0]
.getParameterValue(CMQCFC.MQCACF_Q_NAMES);
QNAME = names;
//number = names.length;
//for (int i = 0; i <= names.length - 1; i++) {
// = names[i];
//}
agent.disconnect();
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
} catch (IOException ioEx) {
System.err.println(ioEx);
}
return QNAME;
}
/**
* 取得当前队列深度
* @return int 当前队列深度
*/
public long getCurQueueDepth() {
int[] selectors = new int[1];
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[0];
selectors[0] = MQC.MQIA_CURRENT_Q_DEPTH;
try {
if((m_qMgr != null) && (m_queue != null)) {
m_queue.inquire(selectors, intAttrs, charAttrs);
curQueueDepth = intAttrs[0];
return curQueueDepth;
}else {
return -1;
}
}catch(MQException mqEx) {
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
return -1;
}
}
/**
* 取得最大队列深度
* @return long 最大队列深度
*/
public long getMaxQueueDepth() {
int[] selectors = new int[1];
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[0];
selectors[0] = MQC.MQIA_MAX_Q_DEPTH;
try {
if((m_qMgr != null) && (m_queue != null)) {
m_queue.inquire(selectors, intAttrs, charAttrs);
maxQueueDepth = intAttrs[0];
return maxQueueDepth;
}else {
return -1;
}
}catch(MQException mqEx) {
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
return -1;
}
}
/**
* 设置最大队列深度
* @param depth 队列深度
*/
public void setMaxQueueDepth(long depth) {
maxQueueDepth = depth;
}
/**
* 提交事物
*/
public void commit() {
try {
m_qMgr.commit();
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
/**
* 回滚事务
*/
public void rollback() {
try {
m_qMgr.backout();
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
/**
* 关闭队列
*/
public void closeQueue() {
try {
m_queue.close();
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
/**
* 断开连接
*/
public void disConnection() {
try {
m_qMgr.disconnect();
} catch (MQException mqEx) {
//System.err.println(mqEx);
setCompletionCode(mqEx.completionCode);
setReasonCode(mqEx.reasonCode);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -