abstractmessagelistenercontainer.java
来自「有关此类编程有心德的高手 希望能够多多给予指教」· Java 代码 · 共 644 行 · 第 1/2 页
JAVA
644 行
this.exposeListenerSession = exposeListenerSession;
}
/**
* Return whether to expose the listener JMS {@link Session} to a
* registered {@link SessionAwareMessageListener}.
*/
public boolean isExposeListenerSession() {
return this.exposeListenerSession;
}
/**
* Set whether to accept received messages while the listener container
* in the process of stopping.
* <p>Default is "false", rejecting such messages through aborting the
* receive attempt. Switch this flag on to fully process such messages
* even in the stopping phase, with the drawback that even newly sent
* messages might still get processed (if coming in before all receive
* timeouts have expired).
* <p><b>NOTE:</b> Aborting receive attempts for such incoming messages
* might lead to the provider's retry count decreasing for the affected
* messages. If you have a high number of concurrent consumers, make sure
* that the number of retries is higher than the number of consumers,
* to be on the safe side for all potential stopping scenarios.
*/
public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
}
/**
* Return whether to accept received messages while the listener container
* in the process of stopping.
*/
public boolean isAcceptMessagesWhileStopping() {
return this.acceptMessagesWhileStopping;
}
protected void validateConfiguration() {
if (this.destination == null) {
throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required");
}
if (isSubscriptionDurable() && !isPubSubDomain()) {
throw new IllegalArgumentException("A durable subscription requires a topic (pub-sub domain)");
}
}
//-------------------------------------------------------------------------
// Template methods for listener execution
//-------------------------------------------------------------------------
/**
* Execute the specified listener,
* committing or rolling back the transaction afterwards (if necessary).
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #handleListenerException
*/
protected void executeListener(Session session, Message message) {
try {
doExecuteListener(session, message);
}
catch (Throwable ex) {
handleListenerException(ex);
}
}
/**
* Execute the specified listener,
* committing or rolling back the transaction afterwards (if necessary).
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see #invokeListener
* @see #commitIfNecessary
* @see #rollbackOnExceptionIfNecessary
* @see #convertJmsAccessException
*/
protected void doExecuteListener(Session session, Message message) throws JMSException {
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because of the listener container " +
"having been stopped in the meantime: " + message);
}
rollbackIfNecessary(session);
throw new MessageRejectedWhileStoppingException();
}
try {
invokeListener(session, message);
}
catch (JMSException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (RuntimeException ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
catch (Error err) {
rollbackOnExceptionIfNecessary(session, err);
throw err;
}
commitIfNecessary(session, message);
}
/**
* Invoke the specified listener: either as standard JMS MessageListener
* or (preferably) as Spring SessionAwareMessageListener.
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see #setMessageListener
*/
protected void invokeListener(Session session, Message message) throws JMSException {
Object listener = getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) listener, session, message);
}
else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
}
else if (listener != null) {
throw new IllegalArgumentException(
"Only MessageListener and SessionAwareMessageListener supported: " + listener);
}
else {
throw new IllegalStateException("No message listener specified - see property 'messageListener'");
}
}
/**
* Invoke the specified listener as Spring SessionAwareMessageListener,
* exposing a new JMS Session (potentially with its own transaction)
* to the listener if demanded.
* @param listener the Spring SessionAwareMessageListener to invoke
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see SessionAwareMessageListener
* @see #setExposeListenerSession
*/
protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
throws JMSException {
Connection conToClose = null;
Session sessionToClose = null;
try {
Session sessionToUse = session;
if (!isExposeListenerSession()) {
// We need to expose a separate Session.
conToClose = createConnection();
sessionToClose = createSession(conToClose);
sessionToUse = sessionToClose;
}
// Actually invoke the message listener...
if (logger.isDebugEnabled()) {
logger.debug("Invoking listener with message of type [" + message.getClass() +
"] and session [" + sessionToUse + "]");
}
listener.onMessage(message, sessionToUse);
// Clean up specially exposed Session, if any.
if (sessionToUse != session) {
if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(sessionToUse);
}
}
}
finally {
JmsUtils.closeSession(sessionToClose);
JmsUtils.closeConnection(conToClose);
}
}
/**
* Invoke the specified listener as standard JMS MessageListener.
* <p>Default implementation performs a plain invocation of the
* <code>onMessage</code> method.
* @param listener the JMS MessageListener to invoke
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.MessageListener#onMessage
*/
protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
listener.onMessage(message);
}
/**
* Perform a commit or message acknowledgement, as appropriate.
* @param session the JMS Session to commit
* @param message the Message to acknowledge
* @throws javax.jms.JMSException in case of commit failure
*/
protected void commitIfNecessary(Session session, Message message) throws JMSException {
// Commit session or acknowledge message.
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(session);
}
}
else if (isClientAcknowledge(session)) {
message.acknowledge();
}
}
/**
* Perform a rollback, if appropriate.
* @param session the JMS Session to rollback
* @throws javax.jms.JMSException in case of a rollback error
*/
protected void rollbackIfNecessary(Session session) throws JMSException {
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this container -> rollback.
JmsUtils.rollbackIfNecessary(session);
}
}
/**
* Perform a rollback, handling rollback exceptions properly.
* @param session the JMS Session to rollback
* @param ex the thrown application exception or error
* @throws javax.jms.JMSException in case of a rollback error
*/
protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException {
try {
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this container -> rollback.
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception", ex);
}
JmsUtils.rollbackIfNecessary(session);
}
}
catch (IllegalStateException ex2) {
logger.debug("Could not roll back because Session already closed", ex2);
}
catch (JMSException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
/**
* Check whether the given Session is locally transacted, that is, whether
* its transaction is managed by this listener container's Session handling
* and not by an external transaction coordinator.
* <p>Note: The Session's own transacted flag will already have been checked
* before. This method is about finding out whether the Session's transaction
* is local or externally coordinated.
* @param session the Session to check
* @return whether the given Session is locally transacted
* @see #isSessionTransacted()
* @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
*/
protected boolean isSessionLocallyTransacted(Session session) {
return isSessionTransacted();
}
/**
* Handle the given exception that arose during listener execution.
* <p>The default implementation logs the exception at error level,
* not propagating it to the JMS provider - assuming that all handling of
* acknowledgement and/or transactions is done by this listener container.
* This can be overridden in subclasses.
* @param ex the exception to handle
*/
protected void handleListenerException(Throwable ex) {
if (ex instanceof MessageRejectedWhileStoppingException) {
// Internal exception - has been handled before.
return;
}
if (ex instanceof JMSException) {
invokeExceptionListener((JMSException) ex);
}
if (isActive()) {
// Regular case: failed while active.
// Log at error level.
logger.warn("Execution of JMS message listener failed", ex);
}
else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown log.
logger.debug("Listener exception after container shutdown", ex);
}
}
/**
* Invoke the registered JMS ExceptionListener, if any.
* @param ex the exception that arose during JMS processing
* @see #setExceptionListener
*/
protected void invokeExceptionListener(JMSException ex) {
ExceptionListener exceptionListener = getExceptionListener();
if (exceptionListener != null) {
exceptionListener.onException(ex);
}
}
/**
* Internal exception class that indicates a rejected message on shutdown.
* Used to trigger a rollback for an external transaction manager in that case.
*/
private static class MessageRejectedWhileStoppingException extends RuntimeException {
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?