📄 mqconnection.java
字号:
try {
out.write(Safmq.CMD_QUEUE_CLOSE);
params.write(out);
out.flush();
ret = getResponseCode();
} catch (IOException e) {
ret = Safmq.EC_NETWORKERROR;
}
return ret;
}
/**
Enqueues a message for message relay forwarding to another queue/queue server. After
completion, the return code will be EC_NOERROR on success and the <code>msg</code>'s
timestamp and message will have been set. It is important to note if the resposne
queue name is not set, any errors from the final destination queue/queue server will
be lost. It is suggested to use roundtripping unless error determination is not
required, otherwise errors generated by the final destination server such as:<br>
<table border=0>
<tr><td>EC_DOESNOTEXIST
<tr><td>EC_NOTAUTHORIZED
<tr><td>EC_WRONGMESSAGETYPE
<tr><td>EC_NOTOPEN
<tr><td>EC_FORWARDNOTALLOWED
<tr><td>EC_DUPLICATEMSGID
</table>
will not returned to the client.
@param uri A safmq URI in the format safmq://user:password@server:port/queuename -or- for ssl safmqs://user:password@server:port/queuename.<br>
Note: the port specification is optional.
@param msg The message to be sent.
@return Safmq.EC_NOERROR on success
Safmq.EC_INVALIDNAME incase of an invalid url <br>
Safmq.EC_NETWORKERROR<br>
Safmq.EC_NOTAUTHORIZED <br>
Safmq.EC_NOTOPEN <br>
Safmq.EC_WRONGMESSAGETYPE <br>
Safmq.EC_FORWARDNOTALLOWED
*/
public int EnqueueWithRelay(URI uri, QueueMessage msg) {
int ret;
try {
QueueHandle forward = new QueueHandle();
String dest = "";
String userinfo = uri.getUserInfo();
String user, password;
user = password = "";
ret = OpenQueue(FORWARD_QUEUE_NAME,forward);
if (ret == Safmq.EC_NOERROR) {
StringBuffer u, p;
if (MQBuilder.parseUserInfo(userinfo,u=new StringBuffer(),p=new StringBuffer())) {
user = u.toString();
password = p.toString();
}
if (uri.getScheme().length() > 0)
dest = uri.getScheme() + ":";
dest += "//";
if (user.length() > 0) {
dest += user;
if (password.length() > 0)
dest += ":" + password;
dest += "@";
}
dest += uri.getHost();
if (uri.getPort() > 0)
dest += ":" + uri.getPort();
dest += uri.getPath();
msg.setLabel(dest+"?label="+msg.getLabel());
ret = Enqueue(forward, msg);
int ec = CloseQueue(forward);
if (ret == Safmq.EC_NOERROR && ec != Safmq.EC_NOERROR)
ret = ec;
} else if (ret == Safmq.EC_DOESNOTEXIST) {
ret = Safmq.EC_FORWARDNOTALLOWED;
}
} catch (Exception e) {
ret = Safmq.EC_INVALIDNAME;
}
return ret;
}
/**
* Places a message on the queue. The object <code>msg</code> must have
* been previously prepared before calling this method. Upon successfull sending
* of the message, the message id and message timestamp will be set by the server
* and may be retrieved by a call to <code>QueueMessage.getMessageID()</code>
* and <code>QueueMessage.getTimeStamp()</code> respectively.
*
* <p>Note: Message responsders will typically place the message id of the original
* message in the recipt id of the message being returned. This round-trip
* message identification is provided by SAFMQ as a method for coordinated two-way
* communications.</p>
*
* @param handle A reference to a queue opened by a call to <code>OpenQueue()</code>
* @param msg The message to be placed on the queue
* @return <code>Safmq.EC_NOERROR</code> on success, Otherwise results may be such listed
* below but are not limited to:
*<table border=0 cellpadding=3 cellspacing=0>
*<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
* <td>A networking error has occurred and the conneciton
* is nolonger valid.
* </td></tr>
*<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
* <td>The queue reference is not valid as it has
* been closed.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
* <td>The user is not authorized to write messages to this
* queue.
* </td></tr>
*</table>
*
* @see #OpenQueue(String, QueueHandle)
*/
public int Enqueue(QueueHandle handle, QueueMessage msg) {
int ret = Safmq.EC_NETWORKERROR;
ENQUEUE_PARAMS params = new ENQUEUE_PARAMS();
params.queueID = handle.handle;
params.msg = msg;
try {
out.write(Safmq.CMD_ENQUEUE);
params.write(out);
out.flush();
ret = getResponseCode();
if (ret == Safmq.EC_NOERROR) {
ENQUEUE_RESPONSE_DATA data = new ENQUEUE_RESPONSE_DATA();
data.msgID = msg.messageID; // set the response data msgID to the msg.messageID so that it can be read back
data.read(in);
msg.timeStamp = data.timestamp;
}
} catch (IOException e) {
ret = Safmq.EC_NETWORKERROR;
}
return ret;
}
/**
* Retrieves the highest priority FIFO message present on the queue. The results
* are placed in the object <code>msg</code>.
*
* <p>Note: Retrieved messages are removed from the queue</p>
*
* @param qhandle A handle to a queue previously opened by a call to OpenQueue(String)
* @param retrievebody A flag indicating whether the body of the message should be retrieved
* @param timeoutseconds The number of seconds to wait before returning, a value of zero (0) will
* cause the method to return immediately if no messages are present on the queue,
* a value of (-1) will cause the method to wait until a message is placed on the queue.
* @param msg Receives the contents of the message.
*
* @return Upon success a value of <code>Safmq.EC_NOERROR</code> is returned and
* the resulting message is placed in <code>msg</code>, otherwise
* errors such as but not limited to could occur: <br>
*<table border=0 cellpadding=3 cellspacing=0>
*<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
* <td>A networking error has occurred and the conneciton
* is nolonger valid.
* </td></tr>
*<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
* <td>The queue is empty and no more messages are
* available.
* </td></tr>
*<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
* <td>The queue reference is not valid as it has
* been closed.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
* <td>The user is not authorized to read messages from this
* queue.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTOPEN</code></td>
* <td>The queue specified has not been opened by this connection.
* </td></tr>
*<tr><td><code>Safmq.EC_TIMEDOUT</code></td>
* <td>The operation timed out before a message became available.
* </td></tr>
*</table>
* @see #OpenQueue(String, QueueHandle)
*/
public int Retrieve(QueueHandle qhandle, boolean retrievebody, int timeoutseconds, QueueMessage msg) {
int ret = Safmq.EC_NOERROR;
RETRIEVE_PEEK_FRONT_PARAMS params = new RETRIEVE_PEEK_FRONT_PARAMS();
params.queueID = qhandle.handle;
params.retrievebody = (byte)(retrievebody ? 1 : 0);
params.timeoutseconds = timeoutseconds;
try {
out.write(Safmq.CMD_RETRIEVE);
params.write(out);
out.flush();
ret = getResponseCode();
if (ret == Safmq.EC_NOERROR) {
msg.read(in, retrievebody);
ret = ackRetrieve(qhandle,msg);
} else
ret = Safmq.EC_NETWORKERROR;
} catch (IOException e) {
ret = Safmq.EC_NETWORKERROR;
}
return ret;
}
/**
* Retrieves the message identified by <code>id</code> in the message's
* recipt id, set prior to the message having been enqueued (See:
* <code>Enqueue(QueueMessage)</code>. The results are placed in the object
* <code>msg</code>.
*
* <p>Note: Message responsders will typically place the message id of the original
* message in the recipt id of the message being returned. This round-trip
* message identification is provided by SAFMQ as a method for coordinated two-way
* communications.</p>
*
* <p>Note: Retrieved messages are removed from the queue.</p>
*
* @param qhandle A handle to a queue previously opened by a call to OpenQueue(String)
* @param retrievebody A flag indicating whether the body of the message should be retrieved
* @param id The UUID of the message to be retrieved.
* @param timeoutseconds The number of seconds to wait before returning, a value of zero (0) will
* cause the method to return immediately if no messages are present on the queue,
* a value of (-1) will cause the method to wait until a message is placed on the queue.
* @param msg Receives the contents of the message.
*
* @return Upon success a value of <code>Safmq.EC_NOERROR</code>, otherwise
* errors such as but not limited to could occur: <br>
*<table border=0 cellpadding=3 cellspacing=0>
*<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
* <td>A networking error has occurred and the conneciton
* is nolonger valid.
* </td></tr>
*<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
* <td>The queue is empty and no more messages are
* available.
* </td></tr>
*<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
* <td>The queue reference is not valid as it has
* been closed.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
* <td>The user is not authorized to read messages from this
* queue.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTOPEN</code></td>
* <td>The queue specified has not been opened by this connection.
* </td></tr>
*<tr><td><code>Safmq.EC_TIMEDOUT</code></td>
* <td>The operation timed out before a message became available.
* </td></tr>
*</table>
* @see #Enqueue(QueueHandle, QueueMessage) Enqueue(QueueMessage)
* @see #OpenQueue(String, QueueHandle)
*/
public int RetrieveID(QueueHandle qhandle, boolean retrievebody, UUID id, int timeoutseconds, QueueMessage msg) {
int ret = Safmq.EC_NOERROR;
RETRIEVE_ID_PEEK_ID_PARAMS params = new RETRIEVE_ID_PEEK_ID_PARAMS();
params.queueID = qhandle.handle;
params.retrievebody = (byte)(retrievebody?1:0);
params.reciptID = id;
params.timeoutseconds = timeoutseconds;
try {
out.write(Safmq.CMD_RETRIEVE_ID);
params.write(out);out.flush();
ret = getResponseCode();
if (ret == Safmq.EC_NOERROR) {
msg.read(in,retrievebody);
ret = ackRetrieveCursor(qhandle,msg);
}
} catch (IOException e) {
ret = Safmq.EC_NETWORKERROR;
}
return ret;
}
/**
* Retrieves the message pointed to by <code>cursorID</code>. The results
* are placed in the object <code>msg</code>.
*
* <p>Note: Retrieved messages are removed from the queue.</p>
*
* @param qhandle A handle to a queue previously opened by a call to OpenQueue(String)
* @param retrievebody A flag indicating whether the body of the message should be retrieved
* @param cursorID The cursor indicating the current position in the queue to be read from
* @param msg Receives the contents of the message.
*
* @return Upon success a value of <code>Safmq.EC_NOERROR</code>, otherwise
* errors such as but not limited to could occur: <br>
*<table border=0 cellpadding=3 cellspacing=0>
*<tr><td><code>Safmq.EC_NETWORKERROR</code></td>
* <td>A networking error has occurred and the conneciton
* is nolonger valid.
* </td></tr>
*<tr><td><code>Safmq.EC_NOMOREMESSAGES</code></td>
* <td>The queue is empty and no more messages are
* available.
* </td></tr>
*<tr><td><code>Safmq.EC_ALREADYCLOSED</code></td>
* <td>The queue reference is not valid as it has
* been closed.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTAUTHORIZED</code></td>
* <td>The user is not authorized to read messages from this
* queue.
* </td></tr>
*<tr><td><code>Safmq.EC_CURSORINVALIDATED</code></td>
* <td>The cursor nolonger points to a valid location in the
* queue.
* </td></tr>
*<tr><td><code>Safmq.EC_NOTOPEN</code></td>
* <td>The queue specified has not been opened by this connection.
* </td></tr>
*</table>
* @see #OpenQueue(String, QueueHandle)
*/
public int RetrieveCursor(QueueHandle qhandle, boolean retrievebody, CursorHandle cursorID, QueueMessage msg) {
int ret = Safmq.EC_NOERROR;
PEEK_CURSOR_RETRIEVE_CURSOR_PARAMS params = new PEEK_CURSOR_RETRIEVE_CURSOR_PARAMS();
params.queueID = qhandle.handle;
params.retrievebody = (byte)(retrievebody?1:0);
params.cursorID = cursorID.handle;
try {
out.write(Safmq.CMD_RETRIEVE_CURSOR);
params.write(out);
out.flush();
ret = getResponseCode();
if (ret == Safmq.EC_NOERROR) {
msg.read(in,retrievebody);
ret = ackRetrieveCursor(qhandle,msg);
}
} catch (IOException e) {
ret = Safmq.EC_NETWORKERROR;
}
return ret;
}
/**
* Gathers the highest priority FIFO message present on the queue. The results
* are placed in the object <code>msg</code>. Any errors from the operation are
* returned on the stack. The message retrieved is <i>not</i> remvoed from the
* queue and is available for reading by other queue readers.
*
* @param qhandle A handle to a queue previously opened by a call to OpenQueue(String)
* @param retrievebody A flag indicating whether the body of the message should be retrieved
* @param timeoutseconds The number of seconds to wait before returning, a value of zero (0) will
* cause the method to return immediately if no messages are present on the queue,
* a value of (-1) will cause the method to wait until a message is placed on the queue.
* @param msg Receives the contents of the message.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -