queue.java
来自「基于MSMQ和JNI的_Java和C#互通信实现」· Java 代码 · 共 303 行
JAVA
303 行
package ionic.Msmq;
/**
*title: 提供对消息队列进行操作的各种接口 <br />
*copy 中智软件<br />
*company ZZsoft<br />
*@version 1.2
*@author ionic,郭帆
*/
public class Queue {
/**
* 消息队列的构造函数_1
* @param queueName 消息队列的完整路径 *
* @throws MessageQueueException
*/
public Queue(String queueName)
throws MessageQueueException
{
// TODO Auto-generated constructor stub
_init(queueName,0x03);
}
/**
* 消息队列构造函数2
* @param queueName 消息队列的完整路径信息
* @param access 初始化类型 OpenForRecive,OpenForSend,OpenForBoth
* @throws MessageQueueException
* @see MessageQueueException
*/
public Queue(String queueName,int access)
throws MessageQueueException
{
_init(queueName,access);
}
/**
* 对代码进行重用
* @param queueName 消息队列的完整路径信息
* @param access 初始化类型 OpenForRecive,OpenForSend,OpenForBoth
* @throws MessageQueueException
* @see MessageQueueException
*/
private void _init(String queueName,int access)
throws MessageQueueException
{
//the openQueue native method causes the _queueSlot to be set.
int rc = 0;
switch(access){
case 0x01://接收
rc = nativeOpenQueueForReceive(queueName);
break;
case 0x02://发送
rc = nativeOpenQueueForSend(queueName);
break;
case 0x03://接收+发送
rc = nativeOpenQueue(queueName);
break;
default: //非法的参数,MQ_INVALID_PARAMETER
rc = 0xC00E0006;
}
if( rc != 0 )
throw new MessageQueueException("Cannot Open Queue",rc);
_name = queueName;
_formatName = "unknown";
_label = "need to set this";
_isTransactional = false;
}
/**
* 在本地创建一个私有队列:仅提供私有队列创建,如需更多功能,请重写该接口
* @param queuePath 消息队列的完整路进信息
* @param queueLabel 消息队列的创建标签:包括是创建者等信息
* @param isTransactional ???暂时未理解
* @return 消息队列引用
* @throws MessageQueueException
*/
public static Queue Create(
String queuePath,
String queueLabel,
boolean isTransactional
)throws MessageQueueException
{
int rc = nativeCreateQueue( queuePath , queueLabel , (isTransactional)?1:0 );
if( rc != 0 )
throw new MessageQueueException("Cannot Create queue.",rc);
Queue q = new Queue("DIRECT=OS:" + queuePath );
q._name = queuePath;
q._label = queueLabel;
q._isTransactional = isTransactional;
return q;
}
/* 隐藏于2007 6 .8
public static Queue Create(String queuePath, String queueLabel, boolean isTransactional)
throws MessageQueueException
{
int rc= nativeCreateQueue( queuePath, queueLabel, (isTransactional)?1:0);
if (rc!=0)
throw new MessageQueueException("Cannot create queue.", rc);
// DIRECT=OS ? or DIRECT=TCP ?
String a1= "OS";
char[] c= queuePath.toCharArray();
if ((c[0]>='1')
&& (c[0]<='9')) a1= "TCP"; // assume ip address
Queue q= new Queue("DIRECT=" + a1 + ":" + queuePath);
q._name= queuePath;
// q._formatName=queueFormatName;
q._label=queueLabel;
q._isTransactional= isTransactional;
return q;
}
*/
/**
* 删除一个消息队列
* @param queuePath 消息队列的完整路径信息
*/
public static void Delete(String queuePath)
throws MessageQueueException
{
int rc = nativeDeleteQueue(queuePath);
if( rc != 0)
throw new MessageQueueException("Cannot Delete Queue",rc);
}
/**
* 发送复杂Message对象消息
* @param msg 消息 @see Message
* @throws MessageQueueException
*/
public void Send(Message msg)
throws MessageQueueException
{
int rc = nativeSend(
msg.getMessage(),
msg.getMessage().length(),
msg.getLabel(),
msg.getCorrelationId(),
msg.getTransactionFlag()
);
if(rc != 0)
throw new MessageQueueException("Cannot Send",rc);
}
/**
* 发送简单的字符串消息
* @param s 要发送的字符串s
* @throws MessageQueueException
*/
public void Send(String s)
throws MessageQueueException
{
int rc = nativeSend(
s,
s.length(),
"",
"",
0
);
if( rc != 0)
throw new MessageQueueException("Cannot send",rc);
}
/**
* 接受消息1
* @param timeout 超时等待时间
* @return Message消息对象 @see Message
* @throws MessageQueueException
*/
public Message Receive(int timeout)
throws MessageQueueException
{
return ReceiveEx(timeout,1);
}
/**
* 默认的接受消息
* @return Message对象
* @throws MessageQueueException
*/
public Message Receive()
throws MessageQueueException
{
return ReceiveEx(0,1);
}
/**
* 取得第一条消息的副本,但是不删除消息
* @return Message消息对象
* @throws MessageQueueException
*/
public Message Peek()
throws MessageQueueException
{
return ReceiveEx(0,0);
}
/**
* 取得一条消息的副本,不删除消息
* @param timeout 超时返回时间
* @return Message消息对象
* @throws MessageQueueException
*/
public Message Peek(int timeout)
throws MessageQueueException
{
return ReceiveEx(timeout,0);
}
/**
* 关闭与消息队列之间的连接
* @throws MessageQueueException
*/
public void Close()
throws MessageQueueException
{
int rc = nativeClose();
if( rc != 0)
throw new MessageQueueException("Cannot close",rc);
}
/**
* 对相应属性的操作
*/
public String getName(){ return _name; }
public String getLabel(){ return _label; }
public String getFormatName(){ return _formatName;}
public boolean isTransactional(){ return _isTransactional; }
/**
* 接收消息的复杂公用函数
* @param timeout 超时等待时间
* @param ReadOrPeek 接收类型
* @return 复杂的Message对象 @see Message
* @throws MessageQueueException
*/
private Message ReceiveEx(int timeout , int ReadOrPeek)
throws MessageQueueException
{
int rc = nativeReceive(128,timeout,ReadOrPeek);
if(rc != 0)
throw new MessageQueueException("Cannot Receive",rc);
return new Message(
_lastMessageRetrieved_MessageString,
_lastMessageRetrieved_MessageLabel,
_lastMessageRetrieved_CorrelationId,
0
);
}
/**
* 从JNIMSMQ用引入的native函数
*/
private static native int nativeInit();
private static native int nativeCreateQueue(String queuePath, String queueLabel, int isTransactional);
private static native int nativeDeleteQueue(String queuePath);
private native int nativeOpenQueue(String queueString);
private native int nativeOpenQueueForSend(String queueString);
private native int nativeOpenQueueForReceive(String queueString);
private native int nativeReceive(int length, int timeout, int ReadOrPeek);
private native int nativeSend(String messageString, int length, String label, String correlationString, int transactionFlag);
private native int nativeClose();
private int _queueSlot = 0;//???
private String _name;//队列名
private String _formatName;//路径???
private String _label;//标签??
private boolean _isTransactional;//???
private String _lastMessageRetrieved_MessageString;//上一次收到的String
private String _lastMessageRetrieved_MessageLabel;//
private String _lastMessageRetrieved_CorrelationId;//
//静态初始块,加载dll
static{
System.loadLibrary("JNIMSMQ");
nativeInit();
}
/**
* 被屏蔽的单元测试函数
* @param args
* @throws MessageQueueException
*/
/*
public static void main(String[] args)
throws MessageQueueException
{
// TODO Auto-generated method stub
System.out.println("fsafdsfds");
//Queue queue = new Queue("hehe");
//queue.Create("OS:.\\Private$\\GuoFan","Hello",false);
}
*/
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?