⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mqconnection.java

📁 一个简单实用的开源C++消息中间件SAFMQ - [软件开发] - [开源 消息中间件 SAFMQ ] 2006-11-23 在很多网络应用中
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
		try {
			out.write(Safmq.CMD_QUEUE_CLOSE);
			params.write(out);
			out.flush();
			ret = getResponseCode();
		} catch (IOException e) {
			ret = Safmq.EC_NETWORKERROR;
		}
		return ret;
	}
	
	/**
	Enqueues a message for message relay forwarding to another queue/queue server.  After
	completion, the return code will be EC_NOERROR on success and the <code>msg</code>'s 
	timestamp and message will have been set.  It is important to note if the resposne
	queue name is not set, any errors from the final destination queue/queue server will
	be lost.  It is suggested to use roundtripping unless error determination is not
	required, otherwise errors generated by the final destination server such as:<br>
		<table border=0>
		<tr><td>EC_DOESNOTEXIST
		<tr><td>EC_NOTAUTHORIZED
		<tr><td>EC_WRONGMESSAGETYPE
		<tr><td>EC_NOTOPEN
		<tr><td>EC_FORWARDNOTALLOWED
		<tr><td>EC_DUPLICATEMSGID
		</table>
	will not returned to the client.
 

	@param uri A safmq URI in the format safmq://user:password@server:port/queuename -or- for ssl safmqs://user:password@server:port/queuename.<br>
				Note: the port specification is optional.
	@param msg The message to be sent.

	@return Safmq.EC_NOERROR on success
		Safmq.EC_INVALIDNAME incase of an invalid url <br>
		Safmq.EC_NETWORKERROR<br>
		Safmq.EC_NOTAUTHORIZED <br>
		Safmq.EC_NOTOPEN <br>
		Safmq.EC_WRONGMESSAGETYPE <br>
		Safmq.EC_FORWARDNOTALLOWED
	*/
	public int EnqueueWithRelay(URI uri, QueueMessage msg) {
		int ret;
		try {
			QueueHandle	forward = new QueueHandle();
			String		dest = "";
			String 		userinfo = uri.getUserInfo();
			String		user, password;
			
			user = password = "";

			ret = OpenQueue(FORWARD_QUEUE_NAME,forward);
			if (ret == Safmq.EC_NOERROR) {
				StringBuffer	u, p;
		
				if (MQBuilder.parseUserInfo(userinfo,u=new StringBuffer(),p=new StringBuffer())) {
					user = u.toString();
					password = p.toString();
				}
				
				if (uri.getScheme().length() > 0)
					dest = uri.getScheme() + ":";
				dest += "//";
				
				
				if (user.length() > 0) {
					dest += user;
					if (password.length() > 0)
						dest += ":" + password;
					dest += "@";
				}
				dest += uri.getHost();
				if (uri.getPort() > 0)
					dest += ":" + uri.getPort();
				dest += uri.getPath();

				msg.setLabel(dest+"?label="+msg.getLabel());
				ret = Enqueue(forward, msg);

				int ec = CloseQueue(forward);
				if (ret == Safmq.EC_NOERROR && ec != Safmq.EC_NOERROR)
					ret = ec;
			} else if (ret == Safmq.EC_DOESNOTEXIST) {
				ret = Safmq.EC_FORWARDNOTALLOWED;
			}
		} catch (Exception e) {
			ret = Safmq.EC_INVALIDNAME;
		}
		return ret;
	}
	
	/**
	 * Places a message on the queue.  The object <code>msg</code> must have
	 * been previously prepared before calling this method.  Upon successfull sending
	 * of the message, the message id and message timestamp will be set by the server
	 * and may be retrieved by a call to <code>QueueMessage.getMessageID()</code>
	 * and <code>QueueMessage.getTimeStamp()</code> respectively.
	 * 
	 * <p>Note: Message responsders will typically place the message id of the original
	 * message in the recipt id of the message being returned.  This round-trip
	 * message identification is provided by SAFMQ as a method for coordinated two-way
	 * communications.</p>
	 * 
	 * @param	handle	A reference to a queue opened by a call to <code>OpenQueue()</code> 
	 * @param	msg		The message to be placed on the queue
	 * @return <code>Safmq.EC_NOERROR</code> on success, Otherwise results may be such listed
	 * below but are not limited to:
	 *<table border=0 cellpadding=3 cellspacing=0>
	 *<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
	 *				<td>A networking error has occurred and the conneciton
	 *					is nolonger valid.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
	 *				<td>The queue reference is not valid as it has 
	 *					been closed.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
	 *				<td>The user is not authorized to write messages to this
	 *					queue.
	 *				</td></tr>
	 *</table>
	 *
	 * @see #OpenQueue(String, QueueHandle)
	 */
	public int Enqueue(QueueHandle handle, QueueMessage msg) {
		int ret = Safmq.EC_NETWORKERROR;
		ENQUEUE_PARAMS	params = new ENQUEUE_PARAMS();

		params.queueID = handle.handle;
		params.msg = msg;

		try {
			out.write(Safmq.CMD_ENQUEUE);
			params.write(out);
			out.flush();

			ret = getResponseCode();
			if (ret == Safmq.EC_NOERROR) {
				ENQUEUE_RESPONSE_DATA	data = new ENQUEUE_RESPONSE_DATA();
				data.msgID = msg.messageID; // set the response data msgID to the msg.messageID so that it can be read back
				data.read(in);
				msg.timeStamp = data.timestamp;
			}
		} catch (IOException e) {
			ret = Safmq.EC_NETWORKERROR;
		}
		return ret;
	}
	

	/**
	 * Retrieves the highest priority FIFO message present on the queue.  The results
	 * are placed in the object <code>msg</code>.
	 * 
	 * <p>Note: Retrieved messages are removed from the queue</p>
	 * 
	 * @param qhandle		A handle to a queue previously opened by a call to OpenQueue(String)
	 * @param retrievebody	A flag indicating whether the body of the message should be retrieved
	 * @param timeoutseconds	The number of seconds to wait before returning, a value of zero (0) will
	 * 							cause the method to return immediately if no messages are present on the queue,
	 * 							a value of (-1) will cause the method to wait until a message is placed on the queue.
	 * @param msg			Receives the contents of the message.
	 * 
	 * @return Upon success a value of <code>Safmq.EC_NOERROR</code> is returned and
	 * 			the resulting message is placed in <code>msg</code>, otherwise 
	 * 			errors such as but not limited to could occur: <br>
	 *<table border=0 cellpadding=3 cellspacing=0>
	 *<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
	 *				<td>A networking error has occurred and the conneciton
	 *					is nolonger valid.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
	 *				<td>The queue is empty and no more messages are
	 *					available.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
	 *				<td>The queue reference is not valid as it has 
	 *					been closed.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
	 *				<td>The user is not authorized to read messages from this
	 *					queue.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTOPEN</code></td>
	 *				<td>The queue specified has not been opened by this connection.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_TIMEDOUT</code></td>
	 *				<td>The operation timed out before a message became available.
	 *				</td></tr>
	 *</table>
	 * @see 	#OpenQueue(String, QueueHandle)
	 */
	public int Retrieve(QueueHandle qhandle, boolean retrievebody, int timeoutseconds, QueueMessage msg) {
		int	ret = Safmq.EC_NOERROR;

		RETRIEVE_PEEK_FRONT_PARAMS	params = new RETRIEVE_PEEK_FRONT_PARAMS();
		params.queueID = qhandle.handle;
		params.retrievebody = (byte)(retrievebody ? 1 : 0);
		params.timeoutseconds = timeoutseconds;

		try {
			out.write(Safmq.CMD_RETRIEVE);
			params.write(out);
			out.flush();
			ret = getResponseCode();
			if (ret == Safmq.EC_NOERROR) {
				msg.read(in, retrievebody);
				ret = ackRetrieve(qhandle,msg);
			} else
				ret = Safmq.EC_NETWORKERROR;
		} catch (IOException e) {
			ret = Safmq.EC_NETWORKERROR;
		}
		return ret;
	}

	/**
	 * Retrieves the message identified by <code>id</code> in the message's 
	 * recipt id, set prior to the message having been enqueued (See: 
	 * <code>Enqueue(QueueMessage)</code>. The results  are placed in the object
	 * <code>msg</code>.  
	 * 
	 * <p>Note: Message responsders will typically place the message id of the original
	 * message in the recipt id of the message being returned.  This round-trip
	 * message identification is provided by SAFMQ as a method for coordinated two-way
	 * communications.</p>
	 * 
	 * <p>Note: Retrieved messages are removed from the queue.</p>
	 * 
	 * @param qhandle		A handle to a queue previously opened by a call to OpenQueue(String)
	 * @param retrievebody	A flag indicating whether the body of the message should be retrieved
	 * @param id			The UUID of the message to be retrieved.
	 * @param timeoutseconds		The number of seconds to wait before returning, a value of zero (0) will
	 * 						cause the method to return immediately if no messages are present on the queue,
	 * 						a value of (-1) will cause the method to wait until a message is placed on the queue.
	 * @param msg			Receives the contents of the message.
	 * 
	 * @return	Upon success a value of <code>Safmq.EC_NOERROR</code>, otherwise 
	 * 			errors such as but not limited to could occur: <br>
	 *<table border=0 cellpadding=3 cellspacing=0>
	 *<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
	 *				<td>A networking error has occurred and the conneciton
	 *					is nolonger valid.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
	 *				<td>The queue is empty and no more messages are
	 *					available.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
	 *				<td>The queue reference is not valid as it has 
	 *					been closed.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
	 *				<td>The user is not authorized to read messages from this
	 *					queue.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTOPEN</code></td>
	 *				<td>The queue specified has not been opened by this connection.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_TIMEDOUT</code></td>
	 *				<td>The operation timed out before a message became available.
	 *				</td></tr>
	 *</table>
	 * @see #Enqueue(QueueHandle, QueueMessage) Enqueue(QueueMessage)
	 * @see #OpenQueue(String, QueueHandle)
	 */
	public int RetrieveID(QueueHandle qhandle, boolean retrievebody, UUID id, int timeoutseconds, QueueMessage msg) {
		int	ret = Safmq.EC_NOERROR;

		RETRIEVE_ID_PEEK_ID_PARAMS	params = new RETRIEVE_ID_PEEK_ID_PARAMS();
		params.queueID = qhandle.handle;
		params.retrievebody = (byte)(retrievebody?1:0);
		params.reciptID = id;
		params.timeoutseconds = timeoutseconds;
		
		try {
			out.write(Safmq.CMD_RETRIEVE_ID);
			params.write(out);out.flush();
			ret = getResponseCode();
			if (ret == Safmq.EC_NOERROR) {
				msg.read(in,retrievebody);
				ret = ackRetrieveCursor(qhandle,msg);
			}
		} catch (IOException e) {
			ret = Safmq.EC_NETWORKERROR;
		}
		return ret;
	}

	/**
	 * Retrieves the message pointed to by <code>cursorID</code>. The results  
	 * are placed in the object <code>msg</code>.
	 * 
	 * <p>Note: Retrieved messages are removed from the queue.</p>
	 * 
	 * @param qhandle		A handle to a queue previously opened by a call to OpenQueue(String)
	 * @param retrievebody	A flag indicating whether the body of the message should be retrieved
	 * @param cursorID		The cursor indicating the current position in the queue to be read from
	 * @param msg			Receives the contents of the message.
	 * 
	 * @return	Upon success a value of <code>Safmq.EC_NOERROR</code>, otherwise 
	 * 			errors such as but not limited to could occur: <br>
	 *<table border=0 cellpadding=3 cellspacing=0>
	 *<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
	 *				<td>A networking error has occurred and the conneciton
	 *					is nolonger valid.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
	 *				<td>The queue is empty and no more messages are
	 *					available.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
	 *				<td>The queue reference is not valid as it has 
	 *					been closed.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
	 *				<td>The user is not authorized to read messages from this
	 *					queue.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_CURSORINVALIDATED</code></td>
	 *				<td>The cursor nolonger points to a valid location in the
	 *					queue.
	 *				</td></tr>
	 *<tr><td><code>Safmq.EC_NOTOPEN</code></td>
	 *				<td>The queue specified has not been opened by this connection.
	 *				</td></tr>
	 *</table>
	 * @see #OpenQueue(String, QueueHandle)
	 */
	public int RetrieveCursor(QueueHandle qhandle, boolean retrievebody, CursorHandle cursorID, QueueMessage msg) {
		int	ret = Safmq.EC_NOERROR;

		PEEK_CURSOR_RETRIEVE_CURSOR_PARAMS	params = new PEEK_CURSOR_RETRIEVE_CURSOR_PARAMS();
		params.queueID = qhandle.handle;
		params.retrievebody = (byte)(retrievebody?1:0);
		params.cursorID = cursorID.handle;

		try {
			out.write(Safmq.CMD_RETRIEVE_CURSOR);
			params.write(out);
			out.flush();
			ret = getResponseCode();
			if (ret == Safmq.EC_NOERROR) {
				msg.read(in,retrievebody);
				ret = ackRetrieveCursor(qhandle,msg);
			}
		} catch (IOException e) {
			ret = Safmq.EC_NETWORKERROR;
		}
		return ret;
	}

	/**
	 * Gathers the highest priority FIFO message present on the queue.  The results
	 * are placed in the object <code>msg</code>.  Any errors from the operation are
	 * returned on the stack.    The message retrieved is <i>not</i> remvoed from the 
	 * queue and is available for reading by other queue readers.
	 * 
	 * @param qhandle		A handle to a queue previously opened by a call to OpenQueue(String)
	 * @param retrievebody	A flag indicating whether the body of the message should be retrieved
	 * @param timeoutseconds	The number of seconds to wait before returning, a value of zero (0) will
	 * 						cause the method to return immediately if no messages are present on the queue,
	 * 						a value of (-1) will cause the method to wait until a message is placed on the queue.
	 * @param msg			Receives the contents of the message.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -