📄 httpjmssessionstub.java
字号:
}
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to getTransactionTimeout session " +
exception);
}
return timeout;
}
// implementation of JmsSessionStubIfc.prepare
public int prepare(Xid xid) throws XAException {
int value = 0;
try {
Vector v = pack("xa_prepare", 1);
v.add(xid);
synchronized (_connection) {
send(v, true);
Vector reply = checkReply("xa_prepare");
Boolean result = (Boolean) reply.get(0);
// check that the call completed before
// extracting the message
if (result.booleanValue()) {
value = ((Integer) reply.get(1)).intValue();
}
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to prepare session " +
exception);
}
return value;
}
// implementation of JmsSessionStubIfc.recover
public Xid[] recover(int flag) throws XAException {
Xid[] xids = new Xid[0];
try {
Vector v = pack("xa_recover", 1);
v.add(new Integer(flag));
synchronized (_connection) {
send(v, true);
Vector reply = checkReply("xa_recover");
Boolean result = (Boolean) reply.get(0);
// check that the call completed before
// extracting the message
if (result.booleanValue()) {
xids = (Xid[]) reply.get(1);
}
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to recover session " +
exception);
}
return xids;
}
// implementation of JmsSessionStubIfc.rollback
public void rollback(Xid xid) throws XAException {
try {
Vector v = pack("xa_rollback", 1);
v.add(xid);
synchronized (_connection) {
send(v, true);
checkReply("xa_rollback");
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to rollback session " +
exception);
}
}
// implementation of JmsSessionStubIfc.setTransactionTimeout
public boolean setTransactionTimeout(int seconds) throws XAException {
boolean value = false;
try {
Vector v = pack("xa_setTransactionTimeout", 1);
v.add(new Integer(seconds));
synchronized (_connection) {
send(v, true);
Vector reply = checkReply("xa_setTransactionTimeout");
Boolean result = (Boolean) reply.get(0);
// check that the call completed before
// extracting the message
if (result.booleanValue()) {
value = ((Boolean) reply.get(1)).booleanValue();
}
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to setTransactionTimeout " +
exception);
}
return value;
}
// implementation of JmsSessionStubIfc.start
public void start(Xid xid, int flags) throws XAException {
try {
Vector v = pack("xa_start", 2);
v.add(xid);
v.add(new Integer(flags));
synchronized (_connection) {
send(v, true);
checkReply("xa_start");
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to start session " +
exception);
}
}
// implementation of JmsSessionStubIfc.getResourceManagerId
public String getResourceManagerId() throws XAException {
String rid = null;
try {
Vector v = pack("xa_getResourceManagerId", 0);
synchronized (_connection) {
send(v, true);
Vector reply = checkReply("xa_getResourceManagerId");
Boolean result = (Boolean) reply.get(0);
// check that the call completed before
// extracting the message
if (result.booleanValue()) {
rid = (String) reply.get(1);
}
}
} catch (JMSException exception) {
// rethrow as a XAException
throw new XAException("Failed to getResourceManagerId " +
exception);
}
return rid;
}
/**
* Set a message listener to be called when new Messages arrive from the
* server.
*
* @param listener A reference to the client listener.
*/
public void setMessageListener(JmsMessageListener listener) {
_listener = listener;
}
// implementation of JmsSessionStubIfc.enableAsynchronousDelivery
public void enableAsynchronousDelivery(long clientId, String id,
boolean enable)
throws JMSException {
Vector v = pack("enableAsynchronousDelivery", 3);
v.add(new Long(clientId));
v.add(id);
v.add(new Boolean(enable));
synchronized (_connection) {
send(v, true);
checkReply("enableAsynchronousDelivery");
}
}
/**
* Stop the message receiver thread.
*/
public synchronized void stopReceiver() {
_listener = null;
if (_msgReceiver != null) {
_msgReceiver.stop();
_msgReceiver = null;
}
}
/**
* Pack all the data that is required by the server in a vector.
* Set the size of the vector to be exactly the right size for efficiency.
*
* @param method The function to activate on the server.
* @param numParams The number of paramaters this method will require.
* @return Vector The vector containing all the data.
*/
private Vector pack(String method, int numParams) {
Vector v = new Vector(5 + numParams);
v.add("org.exolab.jms.server.http.HttpJmsSessionConnection");
packCommon(v, method);
return v;
}
/**
* Pack the common data required by all connection types.
*
* @param v The vector to add the common items to.
* @param method The function to activate on the server.
*/
private void packCommon(Vector v, String method) {
v.add(method);
v.add(_clientId);
v.add(_connectionId);
v.add(_sessionId);
}
/**
* A convenience method to check the success of operations which return
* a true on sucess.
*
* @param method The requested server function.
* @throws JMSException for any error.
*/
private Vector checkReply(String method) throws JMSException {
Vector v = null;
try {
v = (Vector) _connection.receive();
} catch (Exception err) {
// rethrow as a JMSException
throw new JMSException("Operation " + method + " failed: " + err);
}
if (v != null) {
Boolean b = (Boolean) v.get(0);
if (!b.booleanValue()) {
if (v.get(1) instanceof JMSException) {
throw (JMSException) v.get(1);
} else {
throw new JMSException("Operation " + method +
" failed: " + v.get(1));
}
}
} else {
throw new JMSException("Unknown connection error for " + method);
}
return v;
}
/**
* A message has been received.
*
*
* @param ob The data received,
* @param id The connection id this data is received from
* @return any requested result, or null, nothing is sent back to the
* client
*/
public Serializable notify(Object ob, String id) {
Vector v = (Vector) ob;
Vector reply = new Vector();
if (_listener != null) {
if (v.size() == 2) {
if (v.get(1) instanceof Message) {
_listener.onMessage((Message) v.get(1));
} else if (v.get(1) instanceof Vector) {
_listener.onMessages((Vector) v.get(1));
} else if (v.get(1) instanceof Long) {
_listener.onMessageAvailable(((Long) v.get(1)).longValue());
}
} else {
// if size == 1, then this is just a ping.
}
}
return reply;
}
/**
* A client has disconnected. Notify the caller.
*
* @param The unique identifier of this connection.
*/
public void disconnection(String id) {
}
/**
* A convenience method to send a packed command to the server.
*
* @throws JMSException for any error.
*/
private void send(Vector v, boolean replyExpected) throws JMSException {
try {
if (replyExpected) {
_connection.send(v);
} else {
((HttpClient) _connection).sendWithoutResponse(v);
}
} catch (Exception err) {
// rethrow as a JMSException
throw new JMSException("Operation Failed" + err);
}
}
/**
* Start a message receiver thread to receive messages
*
* @throws JMSException If the IpcConnection cannot be created
*/
private synchronized void startReceiver() throws JMSException {
try {
if (_msgReceiver == null) {
_msgReceiver = new Server(this);
if (_log.isDebugEnabled()) {
_log.debug("Starting receiver on port="
+ _msgReceiver.getPort());
}
new Thread(_msgReceiver).start();
}
} catch (Exception err) {
throw new JMSException(err.getMessage());
}
}
} // --HttpJmsSessionStub
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -