queue.java

来自「基于MSMQ和JNI的_Java和C#互通信实现」· Java 代码 · 共 303 行

JAVA
303
字号
package ionic.Msmq;
/**   
 *title: 提供对消息队列进行操作的各种接口 <br />
 *copy   中智软件<br />   
 *company ZZsoft<br />
 *@version 1.2     
 *@author ionic,郭帆   
 */ 
public class Queue {
	
	/**
	 * 消息队列的构造函数_1
	 * @param  queueName 消息队列的完整路径	 * 
	 * @throws MessageQueueException 
	 */
	public Queue(String queueName) 
		throws MessageQueueException
	{
		// TODO Auto-generated constructor stub
		_init(queueName,0x03);
	}

	/**
	 * 消息队列构造函数2
	 * @param queueName 消息队列的完整路径信息
	 * @param access 初始化类型 OpenForRecive,OpenForSend,OpenForBoth
	 * @throws MessageQueueException 
	 * @see MessageQueueException
	 */
	public Queue(String queueName,int access)
		throws MessageQueueException
	{
		_init(queueName,access);
	}
	
	/**
	 * 对代码进行重用
	 * @param queueName 消息队列的完整路径信息
	 * @param access 初始化类型 OpenForRecive,OpenForSend,OpenForBoth
	 * @throws MessageQueueException 
	 * @see MessageQueueException
	 */
	private void _init(String queueName,int access)
		throws MessageQueueException
	{
		//the openQueue native method causes the _queueSlot to be set.
		int rc = 0;
		switch(access){
		case 0x01://接收
			rc = nativeOpenQueueForReceive(queueName);
			break;
		case 0x02://发送
			rc = nativeOpenQueueForSend(queueName);
			break;
		case 0x03://接收+发送
			rc = nativeOpenQueue(queueName);
			break;
		default: //非法的参数,MQ_INVALID_PARAMETER
			rc = 0xC00E0006;
		}
		if( rc != 0 ) 
			throw new MessageQueueException("Cannot Open Queue",rc);
		
		_name = queueName;
		_formatName = "unknown";
		_label = "need to set this";
		_isTransactional = false;
	}	

	/**
	 * 在本地创建一个私有队列:仅提供私有队列创建,如需更多功能,请重写该接口
	 * @param queuePath 消息队列的完整路进信息
	 * @param queueLabel 消息队列的创建标签:包括是创建者等信息
	 * @param isTransactional ???暂时未理解
	 * @return 消息队列引用
	 * @throws MessageQueueException
	 */
	public static Queue Create(
			String queuePath,
			String queueLabel,
			boolean isTransactional
			)throws MessageQueueException
	{
		int rc = nativeCreateQueue( queuePath , queueLabel , (isTransactional)?1:0 );
		if( rc != 0 )
			throw new MessageQueueException("Cannot Create queue.",rc);
		
		Queue q = new Queue("DIRECT=OS:" + queuePath );
		q._name = queuePath;
		q._label = queueLabel;
		q._isTransactional = isTransactional;
		return q;
	}
	
	
	/* 隐藏于2007 6 .8 
	  public static Queue Create(String queuePath, String queueLabel, boolean isTransactional) 
	    throws  MessageQueueException
	  {
	    int rc= nativeCreateQueue( queuePath,  queueLabel,  (isTransactional)?1:0);
	    if (rc!=0) 
	      throw new  MessageQueueException("Cannot create queue.", rc);
	    // DIRECT=OS  ?  or DIRECT=TCP ?
	    String a1= "OS";
	    char[] c= queuePath.toCharArray();
	    if ((c[0]>='1')
		&& (c[0]<='9')) a1= "TCP"; // assume ip address

	    Queue q= new Queue("DIRECT=" + a1 + ":" + queuePath); 
	    q._name= queuePath; 
	    //    q._formatName=queueFormatName;
	    q._label=queueLabel;
	    q._isTransactional= isTransactional; 
	    return q;
	  }
	*/
	
	/**
	 * 删除一个消息队列
	 * @param queuePath 消息队列的完整路径信息
	 */
	public static void Delete(String queuePath)
		throws MessageQueueException
	{
		int rc = nativeDeleteQueue(queuePath);
		if( rc != 0)
			throw new MessageQueueException("Cannot Delete Queue",rc);
	}
	
	/**
	 * 发送复杂Message对象消息
	 * @param msg 消息 @see Message
	 * @throws MessageQueueException
	 */
	public void Send(Message msg)
		throws MessageQueueException
	{
		int rc = nativeSend(
					msg.getMessage(),
					msg.getMessage().length(),
					msg.getLabel(),
					msg.getCorrelationId(),
					msg.getTransactionFlag()
					);
		
		if(rc != 0)
			throw new MessageQueueException("Cannot Send",rc);
	}
	
	/**
	 * 发送简单的字符串消息
	 * @param s 要发送的字符串s
	 * @throws MessageQueueException
	 */
	public void Send(String s)
		throws MessageQueueException
	{
		int rc = nativeSend(
				s,
				s.length(),
				"",
				"",
				0
				);
		if( rc != 0)
			throw new MessageQueueException("Cannot send",rc);
	}
	
	/**
	 * 接受消息1
	 * @param timeout 超时等待时间
	 * @return Message消息对象 @see Message
	 * @throws MessageQueueException
	 */
	public Message Receive(int timeout)
		throws MessageQueueException
	{
		return ReceiveEx(timeout,1); 
	}
	
	/**
	 * 默认的接受消息
	 * @return Message对象
	 * @throws MessageQueueException
	 */
	public Message Receive()
		throws MessageQueueException
	{
		return ReceiveEx(0,1); 
	}
	
	/**
	 * 取得第一条消息的副本,但是不删除消息
	 * @return Message消息对象
	 * @throws MessageQueueException
	 */
	public Message Peek()
		throws MessageQueueException
	{
		return ReceiveEx(0,0);
	}
	
	/**
	 * 取得一条消息的副本,不删除消息
	 * @param timeout 超时返回时间
	 * @return Message消息对象
	 * @throws MessageQueueException
	 */
	public Message Peek(int timeout)
		throws MessageQueueException
	{
		return ReceiveEx(timeout,0);
	}
	
	/**
	 * 关闭与消息队列之间的连接
	 * @throws MessageQueueException
	 */
	public void Close()
		throws MessageQueueException
	{
		int rc = nativeClose();
		if( rc != 0)
			throw new MessageQueueException("Cannot close",rc);
	}
	
	
	
	/**
	 * 对相应属性的操作
	 */
	public String getName(){ return _name; }
	public String getLabel(){ return _label; }
	public String getFormatName(){ return _formatName;}
	public boolean isTransactional(){ return _isTransactional; }

	/**
	 * 接收消息的复杂公用函数
	 * @param timeout 超时等待时间
	 * @param ReadOrPeek 接收类型
	 * @return 复杂的Message对象 @see Message
	 * @throws MessageQueueException
	 */
	private Message ReceiveEx(int timeout , int ReadOrPeek)
		throws MessageQueueException
	{
		int rc = nativeReceive(128,timeout,ReadOrPeek);
		
		if(rc != 0)
			throw new MessageQueueException("Cannot Receive",rc);
		
		return new Message(
				_lastMessageRetrieved_MessageString,
				_lastMessageRetrieved_MessageLabel,
				_lastMessageRetrieved_CorrelationId,
				0
				);		
	}
	
	/**
	 * 从JNIMSMQ用引入的native函数
	 */
	private static native int nativeInit();
	private static native int nativeCreateQueue(String queuePath, String queueLabel, int isTransactional);
	private static native int nativeDeleteQueue(String queuePath);
	private native int nativeOpenQueue(String queueString);
	private native int nativeOpenQueueForSend(String queueString);
	private native int nativeOpenQueueForReceive(String queueString);
	private native int nativeReceive(int length, int timeout, int ReadOrPeek);
	private native int nativeSend(String messageString, int length, String label, String correlationString, int transactionFlag);
	private native int nativeClose();
	
	private int _queueSlot = 0;//???
	private String _name;//队列名
	private String _formatName;//路径???
	private String _label;//标签??
	private boolean _isTransactional;//???
	private String _lastMessageRetrieved_MessageString;//上一次收到的String
	private String _lastMessageRetrieved_MessageLabel;//
	private String _lastMessageRetrieved_CorrelationId;//

	//静态初始块,加载dll
	static{
		System.loadLibrary("JNIMSMQ");
		nativeInit();
	}
	
	/**
	 * 被屏蔽的单元测试函数
	 * @param args
	 * @throws MessageQueueException 
	 */
	/*
	public static void main(String[] args) 
		throws MessageQueueException 
	{
		// TODO Auto-generated method stub
		System.out.println("fsafdsfds");
		//Queue queue = new Queue("hehe");
		//queue.Create("OS:.\\Private$\\GuoFan","Hello",false);
	}
	*/
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?