📄 jmsserversession.java
字号:
ConsumerEndpoint consumer = (ConsumerEndpoint) consumers.next();
ConsumerManager.instance().deleteConsumerEndpoint(consumer);
}
// clear the unacked message cache
_sentMessageCache.clear();
// clear the consumers
_consumers.clear();
// de-register the session from the connection
_connection.closed(this);
} else {
if (_log.isDebugEnabled()) {
_log.debug("close() [session=" + this +
"]: session already closed");
}
}
}
/**
* Send the specified message to the client
*
* @param handle a handle to the message
* @throws JMSException if the message can't be resolved from the handle
* @throws RemoteException if the message can't be delivered to the client
*/
public void onMessage(MessageHandle handle) throws JMSException,
RemoteException {
if (_listener != null) {
MessageImpl message = handle.getMessage();
MessageImpl m = null;
// get the message. It may be null if it has expired
if (message != null) {
try {
m = (MessageImpl) message.clone();
} catch (CloneNotSupportedException exception) {
throw new JMSException(exception.toString());
}
m.setConsumerId(handle.getConsumerId());
m.setJMSRedelivered(handle.getDelivered());
// if we are acking the message and the session is
// transacted and the acknowledge mode is
// CLIENT_ACKNOWLEDGE then send it to the cache before
// we send it to the listener. This will enable clients
// to ack the message while in the onMessage method
if (_transacted || (_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
_sentMessageCache.process(handle);
}
try {
// send the message to the listener.
_listener.onMessage(m);
// if the session is not transacted or the acknowledge mode
// is not CLIENT_ACKNOWLEDGE then process it through the
// sent message cache now.
if (!_transacted &&
(_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
_sentMessageCache.process(handle);
}
} catch (RemoteException exception) {
// close all resources and rethrow it
close();
throw exception;
}
}
} else {
_log.error("Failed to stop async consumer endpoints?");
}
}
/**
* Notifies that a message is available for a particular consumer
*
* @param consumerId the identity of the message consumer
* @throws RemoteException if the session can't be notified
*/
public void onMessageAvailable(long consumerId) throws RemoteException {
_listener.onMessageAvailable(consumerId);
}
/**
* Call recover on all registered consumers. This will cause all
* unacknowledged messages to be redelivered. Before we recover we need to
* stop messages delivery. We then need to start redelivery when the
* recovery has been completed
*
* @throws JMSException if the session can't be recovered
*/
public void recover() throws JMSException {
// stop message delivery
stop();
// clear the messages in the sent message cache
_sentMessageCache.clear();
// restart message delivery
start();
}
/**
* Commit this session, which will acknowledge all sent messages for all
* consumers.
*
* @throws JMSException - if there are any problems
*/
public void commit() throws JMSException {
try {
_sentMessageCache.acknowledgeAllMessages();
} catch (OutOfMemoryError exception) {
String msg =
"Failed to commit transaction due to out-of-memory error";
_log.error(msg, exception);
throw new JMSException(msg);
}
}
/**
* Abort, will return all unacked messages to their respective endpoints, if
* they are still active.
*
* @throws JMSException - if there are any problems
*/
public void rollback() throws JMSException {
_sentMessageCache.clear();
}
// implementation of XAResource.setTransactionTimeout
public void start(Xid xid, int flags) throws XAException {
try {
ResourceManager.instance().start(xid, flags);
// set this as the current xid for this session
_xid = xid;
} catch (ResourceManagerException exception) {
throw new XAException("Failed in start " + exception);
}
}
// implementation XAResource.isSame
public int prepare(Xid xid) throws XAException {
try {
return ResourceManager.instance().prepare(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in prepare " + exception);
}
}
// implementation XAResource.commit
public void commit(Xid xid, boolean onePhase) throws XAException {
try {
ResourceManager.instance().commit(xid, onePhase);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in commit " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.end
public void end(Xid xid, int flags) throws XAException {
try {
ResourceManager.instance().end(xid, flags);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in end " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.forget
public void forget(Xid xid) throws XAException {
try {
ResourceManager.instance().forget(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in forget " + exception);
} finally {
_xid = null;
}
}
// implementation of XAResource.prepare
public Xid[] recover(int flag) throws XAException {
try {
return ResourceManager.instance().recover(flag);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in recover " + exception);
}
}
// implementation of XAResource.recover
public void rollback(Xid xid) throws XAException {
try {
ResourceManager.instance().rollback(xid);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in rollback " + exception);
} finally {
// clear the current xid
_xid = null;
}
}
// implementation of XAResource.getTransactionTimeout
public int getTransactionTimeout() throws XAException {
try {
return ResourceManager.instance().getTransactionTimeout();
} catch (ResourceManagerException exception) {
throw new XAException("Failed in getTransactionTimeout " +
exception);
}
}
// implementation of XAResource.isSameRM
public boolean isSameRM(XAResource xares) throws XAException {
return true;
}
// implementation of XAResource.rollback
public boolean setTransactionTimeout(int seconds) throws XAException {
try {
return ResourceManager.instance().setTransactionTimeout(seconds);
} catch (ResourceManagerException exception) {
throw new XAException("Failed in setTransactionTimeout "
+ exception);
}
}
/**
* Return the xid that is currently associated with this session or null if
* this session is currently not part of a global transactions
*
* @return Xid
*/
public Xid getXid() {
return _xid;
}
/**
* Return the identity of the {@link ResourceManager}. The transaction
* manager should be the only one to initiating this call.
*
* @return the identity of the resource manager
* @throws XAException - if it cannot retrieve the rid.
*/
public String getResourceManagerId() throws XAException {
try {
return ResourceManager.instance().getResourceManagerId();
} catch (ResourceManagerException exception) {
throw new XAException("Failed in getResourceManagerId "
+ exception);
}
}
/**
* Determines if the session is transacted
*
* @return <code>true</code> if the session is transacted
*/
public boolean isTransacted() {
return _transacted;
}
/**
* Returns the message acknowledgement mode for the session
*/
public int getAckMode() {
return _ackMode;
}
/**
* Returns the consumer endpoint given its identifier
*
* @param consumerId the consumer identifier
* @return the consumer endpoint corresponding to <code>consumerId</code>,
* or <code>null</code> if none exists
*/
public ConsumerEndpoint getConsumerEndpoint(long consumerId) {
return (ConsumerEndpoint) _consumers.get(new Long(consumerId));
}
/**
* This method is used to stop and restart the session. Stopping the session
* should stop all message delivery to session consumers
*
* @param stop - true if we need to stop the session, false otherwise
*/
private void pause(boolean stop) {
Iterator iter = _consumers.values().iterator();
while (iter.hasNext()) {
((ConsumerEndpoint) iter.next()).setStopped(stop);
}
}
/**
* Check the delivery mode of the message. If the delivery mode is
* persistent and the destination is non-administered then change the
* delivery mode to non-persistent so that it can be processed correctly by
* the server
*
* @param message - the message to check
* @throws JMSException - propagate JMSException to client
*/
private void checkDeliveryMode(MessageImpl message) throws JMSException {
if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT)
&&
(!DestinationManager.instance()
.isMessageForAdministeredDestination(message))) {
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -