📄 jmsserversession.java
字号:
}
} else {
throw new JMSException(
"Could not get message for handle " + handle,
JMSErrorCodes.FailedToResolveHandle);
}
}
}
// implementation of InternalMessageListener.onMessage
public void onMessages(Vector handles) throws Exception {
_log.error("Illegal to call onMessage");
Thread.currentThread().dumpStack();
}
// implementation of InternalMessageListener.onMessageAvailable
public void onMessageAvailable(long clientId) throws Exception {
_listener.onMessageAvailable(clientId);
}
/**
* This will send a null message down the connection to the client to
* test whether the client endpoint is alive.
*
* @return <code>true</code> if it is active, otherwise <code>false</code>
*/
public boolean isClientEndpointActive() {
boolean active = true;
if (_listener != null) {
try {
// send the message to the listener.
_listener.onMessage(null);
} catch (ClientDisconnectionException exception) {
_log.info("Failed to verify that session " + _sessionId
+ " is active.");
active = false;
// ignore the exception
}
}
return active;
}
/**
* Set a message listener for the session. This is the channel used
* to asynchronously deliver messages to consumers created on this
* session.
*
* @param listener the message listener
*/
public void setMessageListener(JmsMessageListener listener) {
_listener = listener;
}
/**
* Check whether to enable asynchronous message delivery for a particular
* consumer
*
* @param clientId the id of the client to check
* @param id the last processed message
* @param enable <code>true</code> to enable; <code>false</code> to disable
*/
public void enableAsynchronousDelivery(long clientId, String id,
boolean enable)
throws JMSException {
ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
if (consumer == null) {
throw new JMSException(clientId + " is not registered");
}
if (enable) {
consumer.setMessageListener(this);
} else {
consumer.setMessageListener(null);
}
}
/**
* Call recover on all registered consumers. This will cause all
* unacknowledged messages to be redelivered. Before we recover we
* need to stop messages delivery. We then need to start redelivery
* when the recovery has been completed
*
* @throws JMSException if the session can't be recovered
*/
public void recover() throws JMSException {
// stop message delivery
stop();
// iterate over the list of consumers recover them
Iterator consumers = _consumers.values().iterator();
while (consumers.hasNext()) {
((ConsumerEndpoint) consumers.next()).recover();
}
// clear the messages in the sent message cache
_sentMessageCache.clear();
// restart message delivery
start();
}
/**
* Commit this session, which will acknowledge all sent messages for
* all consumers.
*
* @throws JMSException - if there are any problems
*/
public void commit() throws JMSException {
try {
_sentMessageCache.acknowledgeAllMessages();
} catch (OutOfMemoryError exception) {
String msg =
"Failed to commit transaction due to out-of-memory error";
_log.error(msg, exception);
throw new JMSException(msg);
}
}
/**
* Abort, will return all unacked messages to their respective endpoints,
* if they are still active.
*
* @throws JMSException - if there are any problems
*/
public void rollback()
throws JMSException {
_sentMessageCache.clear();
}
// implementation XAResource.commit
public void commit(Xid xid, boolean onePhase) throws XAException {
try {
ResourceManager.instance().commit(xid, onePhase);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in commit " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.end
public void end(Xid xid, int flags) throws XAException {
try {
ResourceManager.instance().end(xid, flags);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in end " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.forget
public void forget(Xid xid) throws XAException {
try {
ResourceManager.instance().forget(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in forget " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.getTransactionTimeout
public int getTransactionTimeout() throws XAException {
try {
return ResourceManager.instance().getTransactionTimeout();
} catch (ResourceManagerException exception) {
throw new XAException("Failed in getTransactionTimeout " +
exception);
}
}
// implementation of XAResource.isSameRM
public boolean isSameRM(XAResource xares) throws XAException {
return true;
}
// implementation XAResource.isSame
public int prepare(Xid xid) throws XAException {
try {
return ResourceManager.instance().prepare(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in prepare " + exception);
}
}
// implementation of XAResource.prepare
public Xid[] recover(int flag) throws XAException {
try {
return ResourceManager.instance().recover(flag);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in recover " + exception);
}
}
// implementation of XAResource.recover
public void rollback(Xid xid) throws XAException {
try {
ResourceManager.instance().rollback(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in rollback " + exception);
} finally {
// clear the current xid
_xid = null;
}
}
// implementation of XAResource.rollback
public boolean setTransactionTimeout(int seconds) throws XAException {
try {
return ResourceManager.instance().setTransactionTimeout(seconds);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in setTransactionTimeout "
+ exception);
}
}
// implementation of XAResource.setTransactionTimeout
public void start(Xid xid, int flags) throws XAException {
try {
ResourceManager.instance().start(xid, flags);
// set this as the current xid for this session
_xid = xid;
} catch (ResourceManagerException exception) {
throw new XAException("Failed in start " + exception);
}
}
/**
* Return the xid that is currently associated with this session or null
* if this session is currently not part of a global transactions
*
* @return Xid
*/
public Xid getXid() {
return _xid;
}
/**
* Return the identity of the {@link ResourceManager}. The transaction
* manager should be the only one to initiating this call.
*
* @return the identity of the resource manager
* @throws XAException - if it cannot retrieve the rid.
*/
public String getResourceManagerId() throws XAException {
try {
return ResourceManager.instance().getResourceManagerId();
} catch (ResourceManagerException exception) {
throw new XAException("Failed in getResourceManagerId "
+ exception);
}
}
/**
* Determines if the session is transacted
*
* @return <code>true</code> if the session is transacted
*/
public boolean isTransacted() {
return _transacted;
}
/**
* Returns the message acknowledgement mode for the session
*/
public int getAckMode() {
return _ackMode;
}
/**
* Returns the consumer endpoint for the supplied client id
*
* @param clientId the identity of the consumer endpoint
* @return the consumer endpoint corresponding to <code>clientId</code>,
* or <code>null</code> if none exists
*/
public ConsumerEndpoint getConsumerEndpoint(long clientId) {
String identity = Long.toString(clientId);
return (ConsumerEndpoint) _consumers.get(identity);
}
/**
* This method is used to stop and restart the session. Stopping the
* session should stop all message delivery to session consumers
*
* @param stop - true if we need to stop the session, false otherwise
*/
private void pause(boolean stop) {
Iterator iter = _consumers.values().iterator();
while (iter.hasNext()) {
((ConsumerEndpoint) iter.next()).setStopped(stop);
}
}
/**
* Check the delivery mode of the message. If the delivery mode is
* persistent and the destination is non-administered then change the
* delivery mode to non-persistent so that it can be processed correctly
* by the server
*
* @param message - the message to check
* @throws JMSException - propagate JMSException to client
*/
private void checkDeliveryMode(MessageImpl message) throws JMSException {
if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) &&
(!DestinationManager.instance().isMessageForAdministeredDestination(message))) {
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
}
} //-- JmsServerSession
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -